You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/24 22:14:03 UTC

[45/50] [abbrv] phoenix git commit: PHOENIX-2697 Provide a SERIAL hint.

PHOENIX-2697 Provide a SERIAL hint.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0b1a180f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0b1a180f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0b1a180f

Branch: refs/heads/calcite
Commit: 0b1a180f1d9acc23cf58ecfc84a67aac110160cd
Parents: 61fa462
Author: Lars Hofhansl <la...@apache.org>
Authored: Sat Feb 20 19:36:35 2016 -0800
Committer: Lars Hofhansl <la...@apache.org>
Committed: Sat Feb 20 19:36:35 2016 -0800

----------------------------------------------------------------------
 .../apache/phoenix/execute/AggregatePlan.java   | 16 ++++++++++-----
 .../org/apache/phoenix/execute/ScanPlan.java    | 14 ++++++++-----
 .../apache/phoenix/iterate/ExplainTable.java    |  2 +-
 .../apache/phoenix/iterate/SerialIterators.java |  3 ++-
 .../java/org/apache/phoenix/parse/HintNode.java |  4 ++++
 .../compile/StatementHintsCompilationTest.java  | 21 +++++++++++++++-----
 6 files changed, 43 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 3de4e68..73a995c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -48,10 +48,13 @@ import org.apache.phoenix.iterate.ParallelIterators;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.ResultIterators;
 import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.SerialIterators;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
 import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
 import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -194,16 +197,19 @@ public class AggregatePlan extends BaseQueryPlan {
                 context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, PInteger.INSTANCE.toBytes(limit));
             }
         }
-        ParallelIterators parallelIterators = new ParallelIterators(this, null, wrapParallelIteratorFactory());
-        splits = parallelIterators.getSplits();
-        scans = parallelIterators.getScans();
+        ResultIterators iterators = statement.getHint().hasHint(HintNode.Hint.SERIAL) ?
+                new SerialIterators(this, null, wrapParallelIteratorFactory(), scanGrouper) :
+                new ParallelIterators(this, null, wrapParallelIteratorFactory());
+
+        splits = iterators.getSplits();
+        scans = iterators.getScans();
 
         AggregatingResultIterator aggResultIterator;
         // No need to merge sort for ungrouped aggregation
         if (groupBy.isEmpty()) {
-            aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(parallelIterators), aggregators);
+            aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(iterators), aggregators);
         } else {
-            aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(parallelIterators), aggregators);
+            aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregators);
         }
 
         if (having != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index f4c570c..d51e6c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -46,6 +46,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.SerialIterators;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
 import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
@@ -85,7 +86,7 @@ public class ScanPlan extends BaseQueryPlan {
     private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException {
         super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
-                        buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter), dynamicFilter);
+                        buildResultIteratorFactory(context, statement, table, orderBy, limit, allowPageFilter), dynamicFilter);
         this.allowPageFilter = allowPageFilter;
         if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
             int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
@@ -94,8 +95,11 @@ public class ScanPlan extends BaseQueryPlan {
         }
     }
 
-    private static boolean isSerial(StatementContext context,
+    private static boolean isSerial(StatementContext context, FilterableStatement statement,
             TableRef tableRef, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException {
+        if (statement.getHint().hasHint(HintNode.Hint.SERIAL)) {
+            return true;
+        }
         Scan scan = context.getScan();
         /*
          * If a limit is provided and we have no filter, run the scan serially when we estimate that
@@ -137,10 +141,10 @@ public class ScanPlan extends BaseQueryPlan {
         return isSerial;
     }
     
-    private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context,
+    private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, FilterableStatement statement,
             TableRef table, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException {
 
-        if (isSerial(context, table, orderBy, limit, allowPageFilter)
+        if (isSerial(context, statement, table, orderBy, limit, allowPageFilter)
                 || ScanUtil.isRoundRobinPossible(orderBy, context)
                 || ScanUtil.isPacingScannersPossible(context)) {
             return ParallelIteratorFactory.NOOP_FACTORY;
@@ -189,7 +193,7 @@ public class ScanPlan extends BaseQueryPlan {
          * limit is provided, run query serially.
          */
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
-        boolean isSerial = isSerial(context, tableRef, orderBy, limit, allowPageFilter);
+        boolean isSerial = isSerial(context, statement, tableRef, orderBy, limit, allowPageFilter);
         Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
         ResultIterators iterators;
         if (isSerial) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 4a71483..b319914 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -110,7 +110,7 @@ public abstract class ExplainTable {
             buf.append("TIMELINE-CONSISTENCY ");
         }
         if (hint.hasHint(Hint.SMALL)) {
-            buf.append("SMALL ");
+            buf.append(Hint.SMALL).append(" ");
         }
         if (OrderBy.REV_ROW_KEY_ORDER_BY.equals(orderBy)) {
             buf.append("REVERSE ");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index a221ba2..60b9f44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
+import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ScanUtil;
 
@@ -54,7 +55,7 @@ public class SerialIterators extends BaseResultIterators {
     public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
             throws SQLException {
         super(plan, perScanLimit, scanGrouper);
-        Preconditions.checkArgument(perScanLimit != null); // must be a limit specified
+        Preconditions.checkArgument(perScanLimit != null || plan.getStatement().getHint().hasHint(HintNode.Hint.SERIAL)); // must be a limit specified or a SERIAL hint
         this.iteratorFactory = iteratorFactory;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index ce20208..6d8451b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -100,6 +100,10 @@ public class HintNode {
        * Saves an RPC call on the scan. See Scan.setSmall(true) in HBase documentation.
        */
      SMALL,
+     /**
+      * Enforces a serial scan.
+      */
+     SERIAL,
     };
 
     private final Map<Hint,String> hints;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
index 7f8adfa..9adf414 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
@@ -61,17 +61,17 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest {
         return filter instanceof SkipScanFilter;
     }
 
-    private static StatementContext compileStatement(String query) throws SQLException {
+    private static QueryPlan compileStatement(String query) throws SQLException {
         return compileStatement(query, Collections.emptyList(), null);
     }
 
-    private static StatementContext compileStatement(String query, List<Object> binds, Integer limit) throws SQLException {
+    private static QueryPlan compileStatement(String query, List<Object> binds, Integer limit) throws SQLException {
         PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
         PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, query);
         TestUtil.bindParams(pstmt, binds);
         QueryPlan plan = pstmt.compileQuery();
         assertEquals(limit, plan.getLimit());
-        return plan.getContext();
+        return plan;
     }
 
     @Test
@@ -80,7 +80,7 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest {
         // A where clause without the first column usually compiles into a range scan.
         String query = "SELECT /*+ SKIP_SCAN */ * FROM atable WHERE entity_id='" + id + "'";
         
-        Scan scan = compileStatement(query).getScan();
+        Scan scan = compileStatement(query).getContext().getScan();
         assertTrue("The first filter should be SkipScanFilter.", usingSkipScan(scan));
     }
 
@@ -88,7 +88,7 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest {
     public void testSelectForceRangeScan() throws Exception {
         String query = "SELECT /*+ RANGE_SCAN */ * FROM atable WHERE organization_id in (" +
                 "'000000000000001', '000000000000002', '000000000000003', '000000000000004')";
-        Scan scan = compileStatement(query).getScan();
+        Scan scan = compileStatement(query).getContext().getScan();
         // Verify that it is not using SkipScanFilter.
         assertFalse("The first filter should not be SkipScanFilter.", usingSkipScan(scan));
     }
@@ -103,4 +103,15 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest {
                 "    SERVER TOP 100 ROWS SORTED BY [ORGANIZATION_ID, PARENT_ID, CREATED_DATE DESC, ENTITY_HISTORY_ID]\n" + 
                 "CLIENT MERGE SORT",QueryUtil.getExplainPlan(rs));
     }
+
+    @Test
+    public void testSerialHint() throws Exception {
+        // test ScanPlan
+        String query = "SELECT /*+ SERIAL */ COUNT(*) FROM atable";
+        assertTrue("Expected a SERIAL query", compileStatement(query).getExplainPlan().getPlanSteps().get(0).contains("SERIAL"));
+
+        // test AggregatePlan
+        query = "SELECT /*+ SERIAL */ * FROM atable";
+        assertTrue("Expected a SERIAL query", compileStatement(query).getExplainPlan().getPlanSteps().get(0).contains("SERIAL"));
+    }
 }