You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2019/05/28 20:43:15 UTC

[phoenix] branch 4.14-HBase-1.2 updated (0a7e93d -> 6349f24)

This is an automated email from the ASF dual-hosted git repository.

pboado pushed a change to branch 4.14-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git.


    from 0a7e93d  PHOENIX-5055 Split mutations batches probably affects correctness of index data
     new 58083d7  PHOENIX-4296: reverse scan in ChunkedResultIterator
     new 6349f24  PHOENIX-5291 Ensure that Phoenix coprocessor close all scanners.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../UngroupedAggregateRegionObserver.java          | 17 ++++-
 .../phoenix/iterate/ChunkedResultIterator.java     | 13 +++-
 .../phoenix/iterate/ChunkedResultIteratorTest.java | 73 ++++++++++++++++++++++
 3 files changed, 97 insertions(+), 6 deletions(-)
 create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/iterate/ChunkedResultIteratorTest.java


[phoenix] 01/02: PHOENIX-4296: reverse scan in ChunkedResultIterator

Posted by pb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pboado pushed a commit to branch 4.14-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 58083d70e1d774aa88283fb16945eb546c0e4f27
Author: chfeng <ch...@gmail.com>
AuthorDate: Thu May 16 11:41:41 2019 +0100

    PHOENIX-4296: reverse scan in ChunkedResultIterator
---
 .../phoenix/iterate/ChunkedResultIterator.java     | 13 +++-
 .../phoenix/iterate/ChunkedResultIteratorTest.java | 73 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 3 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index acb6c04..1aab2d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -58,6 +58,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
 
     private final ParallelIteratorFactory delegateIteratorFactory;
     private ImmutableBytesWritable lastKey = new ImmutableBytesWritable();
+    private ImmutableBytesWritable prevLastKey = new ImmutableBytesWritable();
     private final StatementContext context;
     private final TableRef tableRef;
     private final long chunkSize;
@@ -96,8 +97,9 @@ public class ChunkedResultIterator implements PeekingResultIterator {
         }
     }
 
-    private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, MutationState mutationState,
-    		StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner, QueryPlan plan) throws SQLException {
+    private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory,
+            MutationState mutationState, StatementContext context, TableRef tableRef, Scan scan,
+            long chunkSize, ResultIterator scanner, QueryPlan plan) throws SQLException {
         this.delegateIteratorFactory = delegateIteratorFactory;
         this.context = context;
         this.tableRef = tableRef;
@@ -138,8 +140,12 @@ public class ChunkedResultIterator implements PeekingResultIterator {
         if (resultIterator.peek() == null && lastKey != null) {
             resultIterator.close();
             scan = ScanUtil.newScan(scan);
-            if(ScanUtil.isLocalIndex(scan)) {
+            if (ScanUtil.isLocalIndex(scan)) {
                 scan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.copyKeyBytesIfNecessary(lastKey));
+            } else if (ScanUtil.isReversed(scan)) {
+                // lastKey is the last row the previous iterator meet but not returned.
+                // for reverse scan, use prevLastKey as the new stopRow.
+                scan.setStopRow(ByteUtil.copyKeyBytesIfNecessary(prevLastKey));
             } else {
                 scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
             }
@@ -212,6 +218,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
             byte[] currentKey = lastKey.get();
             int offset = lastKey.getOffset();
             int length = lastKey.getLength();
+            prevLastKey.set(lastKey.copyBytes());
             newTuple.getKey(lastKey);
 
             return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(), lastKey.getLength()) != 0;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ChunkedResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ChunkedResultIteratorTest.java
new file mode 100644
index 0000000..18402f0
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ChunkedResultIteratorTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.util.TestUtil.PHOENIX_JDBC_URL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Properties;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.TableRef;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("deprecated") public class ChunkedResultIteratorTest
+        extends ParallelStatsDisabledIT {
+
+    @Test
+    public void testChunked() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(QueryServices.RENEW_LEASE_ENABLED, "false");
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "2");
+        Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props);
+        String tableName = generateUniqueName();
+
+        conn.createStatement().execute("CREATE TABLE " + tableName
+                + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, B VARCHAR(10))");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (1, 'A')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (2, 'B')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (3, 'C')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (4, 'D')");
+        conn.commit();
+
+
+        String sql = "SELECT A, B FROM " + tableName + " ORDER BY A DESC";
+        PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+        ResultSet rs = stmt.executeQuery(sql);
+
+        int cnt = 0;
+        while ((rs.next())) {
+            cnt++;
+            assertTrue("too many results returned", cnt <= 4);
+        }
+        assertEquals(4, cnt);
+    }
+}


[phoenix] 02/02: PHOENIX-5291 Ensure that Phoenix coprocessor close all scanners.

Posted by pb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pboado pushed a commit to branch 4.14-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 6349f245e29ca54d773026a5563c43a2ab9e8264
Author: Lars Hofhansl <la...@apache.org>
AuthorDate: Thu May 23 06:40:34 2019 +0100

    PHOENIX-5291 Ensure that Phoenix coprocessor close all scanners.
---
 .../coprocessor/UngroupedAggregateRegionObserver.java   | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index f0ce5b2..72ee4a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1158,7 +1158,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         long rowCount = 0; // in case of async, we report 0 as number of rows updated
         StatisticsCollectionRunTracker statsRunTracker =
                 StatisticsCollectionRunTracker.getInstance(config);
-        boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet());
+        final boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet());
         if (runUpdateStats) {
             if (!async) {
                 rowCount = callable.call();
@@ -1187,8 +1187,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
             @Override
             public void close() throws IOException {
-                // No-op because we want to manage closing of the inner scanner ourselves.
-                // This happens inside StatsCollectionCallable.
+                // If we ran/scheduled StatsCollectionCallable the delegate
+                // scanner is closed there. Otherwise close it here.
+                if (!runUpdateStats) {
+                    super.close();
+                }
             }
 
             @Override
@@ -1425,6 +1428,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                             + fullTableName);
                                 Scan scan = new Scan();
                                 scan.setMaxVersions();
+
+                                // close the passed scanner since we are returning a brand-new one
+                                try {
+                                    if (s != null) {
+                                        s.close();
+                                    }
+                                } catch (IOException ignore) {}
+
                                 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
                                     ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
                                     HConstants.OLDEST_TIMESTAMP);