You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2016/02/21 04:43:14 UTC
[48/48] phoenix git commit: PHOENIX-2697 Provide a SERIAL hint.
PHOENIX-2697 Provide a SERIAL hint.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5c07651b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5c07651b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5c07651b
Branch: refs/heads/4.x-HBase-0.98
Commit: 5c07651b55dd0613e924351e10712f17498ab5e1
Parents: 5a5bccd
Author: Lars Hofhansl <la...@apache.org>
Authored: Sat Feb 20 19:36:35 2016 -0800
Committer: Lars Hofhansl <la...@apache.org>
Committed: Sat Feb 20 19:38:58 2016 -0800
----------------------------------------------------------------------
.../apache/phoenix/execute/AggregatePlan.java | 16 ++++++++++-----
.../org/apache/phoenix/execute/ScanPlan.java | 14 ++++++++-----
.../apache/phoenix/iterate/ExplainTable.java | 2 +-
.../apache/phoenix/iterate/SerialIterators.java | 3 ++-
.../java/org/apache/phoenix/parse/HintNode.java | 4 ++++
.../compile/StatementHintsCompilationTest.java | 21 +++++++++++++++-----
6 files changed, 43 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c07651b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 3de4e68..73a995c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -48,10 +48,13 @@ import org.apache.phoenix.iterate.ParallelIterators;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.ResultIterators;
import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -194,16 +197,19 @@ public class AggregatePlan extends BaseQueryPlan {
context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, PInteger.INSTANCE.toBytes(limit));
}
}
- ParallelIterators parallelIterators = new ParallelIterators(this, null, wrapParallelIteratorFactory());
- splits = parallelIterators.getSplits();
- scans = parallelIterators.getScans();
+ ResultIterators iterators = statement.getHint().hasHint(HintNode.Hint.SERIAL) ?
+ new SerialIterators(this, null, wrapParallelIteratorFactory(), scanGrouper) :
+ new ParallelIterators(this, null, wrapParallelIteratorFactory());
+
+ splits = iterators.getSplits();
+ scans = iterators.getScans();
AggregatingResultIterator aggResultIterator;
// No need to merge sort for ungrouped aggregation
if (groupBy.isEmpty()) {
- aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(parallelIterators), aggregators);
+ aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(iterators), aggregators);
} else {
- aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(parallelIterators), aggregators);
+ aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregators);
}
if (having != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c07651b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index f4c570c..d51e6c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -46,6 +46,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
@@ -85,7 +86,7 @@ public class ScanPlan extends BaseQueryPlan {
private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException {
super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY,
parallelIteratorFactory != null ? parallelIteratorFactory :
- buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter), dynamicFilter);
+ buildResultIteratorFactory(context, statement, table, orderBy, limit, allowPageFilter), dynamicFilter);
this.allowPageFilter = allowPageFilter;
if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
@@ -94,8 +95,11 @@ public class ScanPlan extends BaseQueryPlan {
}
}
- private static boolean isSerial(StatementContext context,
+ private static boolean isSerial(StatementContext context, FilterableStatement statement,
TableRef tableRef, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException {
+ if (statement.getHint().hasHint(HintNode.Hint.SERIAL)) {
+ return true;
+ }
Scan scan = context.getScan();
/*
* If a limit is provided and we have no filter, run the scan serially when we estimate that
@@ -137,10 +141,10 @@ public class ScanPlan extends BaseQueryPlan {
return isSerial;
}
- private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context,
+ private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, FilterableStatement statement,
TableRef table, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException {
- if (isSerial(context, table, orderBy, limit, allowPageFilter)
+ if (isSerial(context, statement, table, orderBy, limit, allowPageFilter)
|| ScanUtil.isRoundRobinPossible(orderBy, context)
|| ScanUtil.isPacingScannersPossible(context)) {
return ParallelIteratorFactory.NOOP_FACTORY;
@@ -189,7 +193,7 @@ public class ScanPlan extends BaseQueryPlan {
* limit is provided, run query serially.
*/
boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
- boolean isSerial = isSerial(context, tableRef, orderBy, limit, allowPageFilter);
+ boolean isSerial = isSerial(context, statement, tableRef, orderBy, limit, allowPageFilter);
Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
ResultIterators iterators;
if (isSerial) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c07651b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index beb209d..df7b4c3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -106,7 +106,7 @@ public abstract class ExplainTable {
Scan scan = context.getScan();
if (hint.hasHint(Hint.SMALL)) {
- buf.append("SMALL ");
+ buf.append(Hint.SMALL).append(" ");
}
if (OrderBy.REV_ROW_KEY_ORDER_BY.equals(orderBy)) {
buf.append("REVERSE ");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c07651b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index a221ba2..60b9f44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
+import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ScanUtil;
@@ -54,7 +55,7 @@ public class SerialIterators extends BaseResultIterators {
public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
throws SQLException {
super(plan, perScanLimit, scanGrouper);
- Preconditions.checkArgument(perScanLimit != null); // must be a limit specified
+ Preconditions.checkArgument(perScanLimit != null || plan.getStatement().getHint().hasHint(HintNode.Hint.SERIAL)); // must be a limit specified or a SERIAL hint
this.iteratorFactory = iteratorFactory;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c07651b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index ce20208..6d8451b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -100,6 +100,10 @@ public class HintNode {
* Saves an RPC call on the scan. See Scan.setSmall(true) in HBase documentation.
*/
SMALL,
+ /**
+ * Enforces a serial scan.
+ */
+ SERIAL,
};
private final Map<Hint,String> hints;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5c07651b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
index 7f8adfa..9adf414 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
@@ -61,17 +61,17 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest {
return filter instanceof SkipScanFilter;
}
- private static StatementContext compileStatement(String query) throws SQLException {
+ private static QueryPlan compileStatement(String query) throws SQLException {
return compileStatement(query, Collections.emptyList(), null);
}
- private static StatementContext compileStatement(String query, List<Object> binds, Integer limit) throws SQLException {
+ private static QueryPlan compileStatement(String query, List<Object> binds, Integer limit) throws SQLException {
PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, query);
TestUtil.bindParams(pstmt, binds);
QueryPlan plan = pstmt.compileQuery();
assertEquals(limit, plan.getLimit());
- return plan.getContext();
+ return plan;
}
@Test
@@ -80,7 +80,7 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest {
// A where clause without the first column usually compiles into a range scan.
String query = "SELECT /*+ SKIP_SCAN */ * FROM atable WHERE entity_id='" + id + "'";
- Scan scan = compileStatement(query).getScan();
+ Scan scan = compileStatement(query).getContext().getScan();
assertTrue("The first filter should be SkipScanFilter.", usingSkipScan(scan));
}
@@ -88,7 +88,7 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest {
public void testSelectForceRangeScan() throws Exception {
String query = "SELECT /*+ RANGE_SCAN */ * FROM atable WHERE organization_id in (" +
"'000000000000001', '000000000000002', '000000000000003', '000000000000004')";
- Scan scan = compileStatement(query).getScan();
+ Scan scan = compileStatement(query).getContext().getScan();
// Verify that it is not using SkipScanFilter.
assertFalse("The first filter should not be SkipScanFilter.", usingSkipScan(scan));
}
@@ -103,4 +103,15 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest {
" SERVER TOP 100 ROWS SORTED BY [ORGANIZATION_ID, PARENT_ID, CREATED_DATE DESC, ENTITY_HISTORY_ID]\n" +
"CLIENT MERGE SORT",QueryUtil.getExplainPlan(rs));
}
+
+ @Test
+ public void testSerialHint() throws Exception {
+ // test ScanPlan
+ String query = "SELECT /*+ SERIAL */ COUNT(*) FROM atable";
+ assertTrue("Expected a SERIAL query", compileStatement(query).getExplainPlan().getPlanSteps().get(0).contains("SERIAL"));
+
+ // test AggregatePlan
+ query = "SELECT /*+ SERIAL */ * FROM atable";
+ assertTrue("Expected a SERIAL query", compileStatement(query).getExplainPlan().getPlanSteps().get(0).contains("SERIAL"));
+ }
}