You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2015/07/01 09:43:37 UTC
[2/3] phoenix git commit: PHOENIX-2075 MR integration uses single
mapper unless table is salted
PHOENIX-2075 MR integration uses single mapper unless table is salted
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6a07d45a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6a07d45a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6a07d45a
Branch: refs/heads/master
Commit: 6a07d45a76d6c07666b777676627ebc313d0d7b5
Parents: d2392be
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Fri Jun 26 14:54:31 2015 -0700
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Tue Jun 30 22:13:59 2015 -0700
----------------------------------------------------------------------
.../org/apache/phoenix/compile/QueryPlan.java | 3 +
.../apache/phoenix/compile/TraceQueryPlan.java | 9 ++-
.../apache/phoenix/execute/AggregatePlan.java | 3 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 14 +++--
.../phoenix/execute/ClientAggregatePlan.java | 5 +-
.../phoenix/execute/ClientProcessingPlan.java | 9 +++
.../apache/phoenix/execute/ClientScanPlan.java | 5 +-
.../phoenix/execute/DegenerateQueryPlan.java | 3 +-
.../apache/phoenix/execute/HashJoinPlan.java | 11 +++-
.../org/apache/phoenix/execute/ScanPlan.java | 7 ++-
.../phoenix/execute/SortMergeJoinPlan.java | 13 +++-
.../phoenix/execute/TupleProjectionPlan.java | 11 +++-
.../org/apache/phoenix/execute/UnionPlan.java | 6 ++
.../phoenix/iterate/BaseResultIterators.java | 24 ++------
.../iterate/DefaultParallelScanGrouper.java | 62 ++++++++++++++++++++
.../iterate/MapReduceParallelScanGrouper.java | 45 ++++++++++++++
.../phoenix/iterate/ParallelIterators.java | 9 ++-
.../phoenix/iterate/ParallelScanGrouper.java | 41 +++++++++++++
.../apache/phoenix/iterate/SerialIterators.java | 4 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 7 +++
.../phoenix/mapreduce/PhoenixInputFormat.java | 6 +-
.../phoenix/mapreduce/PhoenixInputSplit.java | 1 +
.../query/ParallelIteratorsSplitTest.java | 6 ++
23 files changed, 259 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index d0c63fa..1c0c469 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
@@ -46,6 +47,8 @@ public interface QueryPlan extends StatementPlan {
*/
public ResultIterator iterator() throws SQLException;
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException;
+
public long getEstimatedSize();
// TODO: change once joins are supported
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 11377de..93a2da0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -38,6 +38,8 @@ import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
@@ -105,9 +107,14 @@ public class TraceQueryPlan implements QueryPlan {
public ExplainPlan getExplainPlan() throws SQLException {
return ExplainPlan.EMPTY_PLAN;
}
-
+
@Override
public ResultIterator iterator() throws SQLException {
+ return iterator(DefaultParallelScanGrouper.getInstance());
+ }
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
final PhoenixConnection conn = stmt.getConnection();
if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) {
return ResultIterator.EMPTY_ITERATOR;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 00e843d..67222d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -45,6 +45,7 @@ import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
import org.apache.phoenix.iterate.OrderedResultIterator;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
@@ -141,7 +142,7 @@ public class AggregatePlan extends BaseQueryPlan {
}
@Override
- protected ResultIterator newIterator() throws SQLException {
+ protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
if (groupBy.isEmpty()) {
UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan());
} else {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 8b6de1d..37b73c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -46,8 +46,10 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.DelegateResultIterator;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.parse.FilterableStatement;
@@ -155,11 +157,15 @@ public abstract class BaseQueryPlan implements QueryPlan {
// }
@Override
+ public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper);
+ }
+
public final ResultIterator iterator() throws SQLException {
- return iterator(Collections.<SQLCloseable>emptyList());
+ return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance());
}
- public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException {
+ public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper) throws SQLException {
if (context.getScanRanges() == ScanRanges.NOTHING) {
return ResultIterator.EMPTY_ITERATOR;
}
@@ -235,7 +241,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
LOG.debug(LogUtil.addCustomAnnotations("Scan ready for iteration: " + scan, connection));
}
- ResultIterator iterator = newIterator();
+ ResultIterator iterator = newIterator(scanGrouper);
iterator = dependencies.isEmpty() ?
iterator : new DelegateResultIterator(iterator) {
@Override
@@ -361,7 +367,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
}
}
- abstract protected ResultIterator newIterator() throws SQLException;
+ abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException;
@Override
public long getEstimatedSize() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 30adbe9..3df0447 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -48,6 +48,7 @@ import org.apache.phoenix.iterate.LimitingResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
@@ -80,8 +81,8 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
}
@Override
- public ResultIterator iterator() throws SQLException {
- ResultIterator iterator = delegate.iterator();
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ ResultIterator iterator = delegate.iterator(scanGrouper);
if (where != null) {
iterator = new FilterResultIterator(iterator, where);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
index 8e787b4..b189933 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
@@ -17,11 +17,15 @@
*/
package org.apache.phoenix.execute;
+import java.sql.SQLException;
+
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.schema.TableRef;
@@ -79,4 +83,9 @@ public abstract class ClientProcessingPlan extends DelegateQueryPlan {
public FilterableStatement getStatement() {
return statement;
}
+
+ @Override
+ public ResultIterator iterator() throws SQLException {
+ return iterator(DefaultParallelScanGrouper.getInstance());
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 01fbd11..4bf1889 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.FilterResultIterator;
import org.apache.phoenix.iterate.LimitingResultIterator;
import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.parse.FilterableStatement;
@@ -49,8 +50,8 @@ public class ClientScanPlan extends ClientProcessingPlan {
}
@Override
- public ResultIterator iterator() throws SQLException {
- ResultIterator iterator = delegate.iterator();
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ ResultIterator iterator = delegate.iterator(scanGrouper);
if (where != null) {
iterator = new FilterResultIterator(iterator, where);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index fda53ea..98eb2dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -27,6 +27,7 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.parse.FilterableStatement;
@@ -51,7 +52,7 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
}
@Override
- protected ResultIterator newIterator() throws SQLException {
+ protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 57fa25a..05ef1ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -49,7 +49,9 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.InListExpression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.RowValueConstructorExpression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.FilterResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
@@ -113,9 +115,14 @@ public class HashJoinPlan extends DelegateQueryPlan {
this.subPlans = subPlans;
this.recompileWhereClause = recompileWhereClause;
}
-
+
@Override
public ResultIterator iterator() throws SQLException {
+ return iterator(DefaultParallelScanGrouper.getInstance());
+ }
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
int count = subPlans.length;
PhoenixConnection connection = getContext().getConnection();
ConnectionQueryServices services = connection.getQueryServices();
@@ -191,7 +198,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
}
- ResultIterator iterator = joinInfo == null ? delegate.iterator() : ((BaseQueryPlan) delegate).iterator(dependencies);
+ ResultIterator iterator = joinInfo == null ? delegate.iterator(scanGrouper) : ((BaseQueryPlan) delegate).iterator(dependencies, scanGrouper);
if (statement.getInnerSelectStatement() != null && postFilter != null) {
iterator = new FilterResultIterator(iterator, postFilter);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 884d835..b9dd2f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -38,6 +38,7 @@ import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.ResultIterators;
import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -161,7 +162,7 @@ public class ScanPlan extends BaseQueryPlan {
}
@Override
- protected ResultIterator newIterator() throws SQLException {
+ protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
// Set any scan attributes before creating the scanner, as it will be too late afterwards
Scan scan = context.getScan();
scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
@@ -177,9 +178,9 @@ public class ScanPlan extends BaseQueryPlan {
Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
ResultIterators iterators;
if (isSerial) {
- iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory);
+ iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper);
} else {
- iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory);
+ iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper);
}
splits = iterators.getSplits();
scans = iterators.getScans();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 46ade33..1bbda07 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -44,7 +44,9 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.MappedByteBufferQueue;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.parse.FilterableStatement;
@@ -114,10 +116,15 @@ public class SortMergeJoinPlan implements QueryPlan {
}
@Override
- public ResultIterator iterator() throws SQLException {
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
return type == JoinType.Semi || type == JoinType.Anti ?
- new SemiAntiJoinIterator(lhsPlan.iterator(), rhsPlan.iterator()) :
- new BasicJoinIterator(lhsPlan.iterator(), rhsPlan.iterator());
+ new SemiAntiJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)) :
+ new BasicJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper));
+ }
+
+ @Override
+ public ResultIterator iterator() throws SQLException {
+ return iterator(DefaultParallelScanGrouper.getInstance());
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index c9cbd15..e8d9af0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -23,8 +23,10 @@ import java.util.List;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.DelegateResultIterator;
import org.apache.phoenix.iterate.FilterResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -50,10 +52,15 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
return new ExplainPlan(planSteps);
}
-
+
@Override
public ResultIterator iterator() throws SQLException {
- ResultIterator iterator = new DelegateResultIterator(delegate.iterator()) {
+ return iterator(DefaultParallelScanGrouper.getInstance());
+ }
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ ResultIterator iterator = new DelegateResultIterator(delegate.iterator(scanGrouper)) {
@Override
public Tuple next() throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 2bed3a0..53745fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LimitingResultIterator;
import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.UnionResultIterators;
import org.apache.phoenix.parse.FilterableStatement;
@@ -123,6 +124,11 @@ public class UnionPlan implements QueryPlan {
}
@Override
+ public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ return iterator(Collections.<SQLCloseable>emptyList());
+ }
+
+ @Override
public final ResultIterator iterator() throws SQLException {
return iterator(Collections.<SQLCloseable>emptyList());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 43731cb..cf66d93 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -103,6 +103,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private final byte[] physicalTableName;
private final QueryPlan plan;
protected final String scanId;
+ private final ParallelScanGrouper scanGrouper;
// TODO: too much nesting here - breakup into new classes.
private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
@@ -133,9 +134,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return true;
}
- public BaseResultIterators(QueryPlan plan, Integer perScanLimit) throws SQLException {
+ public BaseResultIterators(QueryPlan plan, Integer perScanLimit, ParallelScanGrouper scanGrouper) throws SQLException {
super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), plan.getLimit());
this.plan = plan;
+ this.scanGrouper = scanGrouper;
StatementContext context = plan.getContext();
TableRef tableRef = plan.getTableRef();
PTable table = tableRef.getTable();
@@ -371,24 +373,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) {
- PTable table = getTable();
- boolean startNewScanList = false;
- if (!plan.isRowKeyOrdered()) {
- startNewScanList = true;
- } else if (crossedRegionBoundary) {
- if (table.getIndexType() == IndexType.LOCAL) {
- startNewScanList = true;
- } else if (table.getBucketNum() != null) {
- startNewScanList = scans.isEmpty() ||
- ScanUtil.crossesPrefixBoundary(startKey,
- ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES),
- SaltingUtil.NUM_SALTING_BYTES);
- }
- }
+ boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary);
if (scan != null) {
- scans.add(scan);
+ scans.add(scan);
}
- if (startNewScanList && !scans.isEmpty()) {
+ if (startNewScan && !scans.isEmpty()) {
parallelScans.add(scans);
scans = Lists.newArrayListWithExpectedSize(1);
}
@@ -410,7 +399,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
Scan scan = context.getScan();
List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
.getAllTableRegions(physicalTableName);
-
List<byte[]> regionBoundaries = toBoundaries(regionLocations);
ScanRanges scanRanges = context.getScanRanges();
PTable table = getTable();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
new file mode 100644
index 0000000..5c7136f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.util.ScanUtil;
+
+/**
+ * Default implementation that creates a scan group if a plan is row key ordered (which requires a merge sort),
+ * or if a scan crosses a region boundary and the table is salted or a local index.
+ */
+public class DefaultParallelScanGrouper implements ParallelScanGrouper {
+
+ private static final DefaultParallelScanGrouper INSTANCE = new DefaultParallelScanGrouper();
+
+ public static DefaultParallelScanGrouper getInstance() {
+ return INSTANCE;
+ }
+
+ private DefaultParallelScanGrouper() {}
+
+ @Override
+ public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, byte[] startKey, boolean crossedRegionBoundary) {
+ PTable table = plan.getTableRef().getTable();
+ boolean startNewScanGroup = false;
+ if (!plan.isRowKeyOrdered()) {
+ startNewScanGroup = true;
+ } else if (crossedRegionBoundary) {
+ if (table.getIndexType() == IndexType.LOCAL) {
+ startNewScanGroup = true;
+ } else if (table.getBucketNum() != null) {
+ startNewScanGroup = scans.isEmpty() ||
+ ScanUtil.crossesPrefixBoundary(startKey,
+ ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES),
+ SaltingUtil.NUM_SALTING_BYTES);
+ }
+ }
+ return startNewScanGroup;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
new file mode 100644
index 0000000..bf2666d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+
+/**
+ * Scan grouper that creates a scan group if a plan is row key ordered or if a
+ * scan crosses region boundaries
+ */
+public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
+
+ private static final MapReduceParallelScanGrouper INSTANCE = new MapReduceParallelScanGrouper();
+
+ public static MapReduceParallelScanGrouper getInstance() {
+ return INSTANCE;
+ }
+
+ private MapReduceParallelScanGrouper() {}
+
+ @Override
+ public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans,
+ byte[] startKey, boolean crossedRegionBoundary) {
+ return !plan.isRowKeyOrdered() || crossedRegionBoundary;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 2dfbfe3..87f8335 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -54,11 +54,16 @@ public class ParallelIterators extends BaseResultIterators {
private static final String NAME = "PARALLEL";
private final ParallelIteratorFactory iteratorFactory;
- public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+ public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
throws SQLException {
- super(plan, perScanLimit);
+ super(plan, perScanLimit, scanGrouper);
this.iteratorFactory = iteratorFactory;
}
+
+ public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+ throws SQLException {
+ this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance());
+ }
@Override
protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
new file mode 100644
index 0000000..0becf4f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+
+/**
+ * Interface for a parallel scan grouper
+ */
+public interface ParallelScanGrouper {
+
+ /**
+ * Determines whether to create a new group of parallel scans.
+ *
+ * @param scans current scan group
+ * @param plan current query plan
+ * @param startKey start key of scan
+ * @param crossedRegionBoundary whether we crossed a region boundary
+ * @return true if we should create a new group of scans
+ */
+ boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, byte[] startKey, boolean crossedRegionBoundary);
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 516d73e..fa18c83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -51,9 +51,9 @@ public class SerialIterators extends BaseResultIterators {
private static final String NAME = "SERIAL";
private final ParallelIteratorFactory iteratorFactory;
- public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+ public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
throws SQLException {
- super(plan, perScanLimit);
+ super(plan, perScanLimit, scanGrouper);
Preconditions.checkArgument(perScanLimit != null); // must be a limit specified
this.iteratorFactory = iteratorFactory;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index c6c5b0c..2bb3b92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -71,6 +71,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.iterate.MaterializedResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AliasedNode;
@@ -446,6 +447,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
public ResultIterator iterator() throws SQLException {
return iterator;
}
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ return iterator;
+ }
@Override
public long getEstimatedSize() {
@@ -511,6 +517,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
public boolean useRoundRobinIterator() throws SQLException {
return false;
}
+
};
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 31759b4..8ee1634 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,6 +37,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -112,10 +114,10 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
Preconditions.checkNotNull(selectStatement);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
- // Optimize the query plan so that we potentially use secondary indexes
+ // Optimize the query plan so that we potentially use secondary indexes
final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
// Initialize the query plan so it sets up the parallel scans
- queryPlan.iterator();
+ queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
return queryPlan;
} catch (Exception exception) {
LOG.error(String.format("Failed to get the query plan with error [%s]",
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index b222fc9..caee3cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputSplit;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index ecb088a..ad65373 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.compile.SequenceManager;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SpoolingResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -368,6 +369,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
}
@Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ return ResultIterator.EMPTY_ITERATOR;
+ }
+
+ @Override
public ResultIterator iterator() throws SQLException {
return ResultIterator.EMPTY_ITERATOR;
}