You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/07/02 23:01:47 UTC

[44/47] phoenix git commit: PHOENIX-2075 MR integration uses single mapper unless table is salted

PHOENIX-2075 MR integration uses single mapper unless table is salted


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6a07d45a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6a07d45a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6a07d45a

Branch: refs/heads/calcite
Commit: 6a07d45a76d6c07666b777676627ebc313d0d7b5
Parents: d2392be
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Fri Jun 26 14:54:31 2015 -0700
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Tue Jun 30 22:13:59 2015 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/compile/QueryPlan.java   |  3 +
 .../apache/phoenix/compile/TraceQueryPlan.java  |  9 ++-
 .../apache/phoenix/execute/AggregatePlan.java   |  3 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   | 14 +++--
 .../phoenix/execute/ClientAggregatePlan.java    |  5 +-
 .../phoenix/execute/ClientProcessingPlan.java   |  9 +++
 .../apache/phoenix/execute/ClientScanPlan.java  |  5 +-
 .../phoenix/execute/DegenerateQueryPlan.java    |  3 +-
 .../apache/phoenix/execute/HashJoinPlan.java    | 11 +++-
 .../org/apache/phoenix/execute/ScanPlan.java    |  7 ++-
 .../phoenix/execute/SortMergeJoinPlan.java      | 13 +++-
 .../phoenix/execute/TupleProjectionPlan.java    | 11 +++-
 .../org/apache/phoenix/execute/UnionPlan.java   |  6 ++
 .../phoenix/iterate/BaseResultIterators.java    | 24 ++------
 .../iterate/DefaultParallelScanGrouper.java     | 62 ++++++++++++++++++++
 .../iterate/MapReduceParallelScanGrouper.java   | 45 ++++++++++++++
 .../phoenix/iterate/ParallelIterators.java      |  9 ++-
 .../phoenix/iterate/ParallelScanGrouper.java    | 41 +++++++++++++
 .../apache/phoenix/iterate/SerialIterators.java |  4 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  7 +++
 .../phoenix/mapreduce/PhoenixInputFormat.java   |  6 +-
 .../phoenix/mapreduce/PhoenixInputSplit.java    |  1 +
 .../query/ParallelIteratorsSplitTest.java       |  6 ++
 23 files changed, 259 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index d0c63fa..1c0c469 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
@@ -46,6 +47,8 @@ public interface QueryPlan extends StatementPlan {
      */
     public ResultIterator iterator() throws SQLException;
     
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException;
+    
     public long getEstimatedSize();
     
     // TODO: change once joins are supported

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 11377de..93a2da0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -38,6 +38,8 @@ import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
@@ -105,9 +107,14 @@ public class TraceQueryPlan implements QueryPlan {
     public ExplainPlan getExplainPlan() throws SQLException {
         return ExplainPlan.EMPTY_PLAN;
     }
-
+    
     @Override
     public ResultIterator iterator() throws SQLException {
+    	return iterator(DefaultParallelScanGrouper.getInstance());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
         final PhoenixConnection conn = stmt.getConnection();
         if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) {
             return ResultIterator.EMPTY_ITERATOR;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 00e843d..67222d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -45,6 +45,7 @@ import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
@@ -141,7 +142,7 @@ public class AggregatePlan extends BaseQueryPlan {
     }
     
     @Override
-    protected ResultIterator newIterator() throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
         if (groupBy.isEmpty()) {
             UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan());
         } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 8b6de1d..37b73c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -46,8 +46,10 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -155,11 +157,15 @@ public abstract class BaseQueryPlan implements QueryPlan {
 //    }
     
     @Override
+    public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper);
+    }
+    
     public final ResultIterator iterator() throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList());
+        return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance());
     }
 
-    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException {
+    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper) throws SQLException {
         if (context.getScanRanges() == ScanRanges.NOTHING) {
             return ResultIterator.EMPTY_ITERATOR;
         }
@@ -235,7 +241,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         	LOG.debug(LogUtil.addCustomAnnotations("Scan ready for iteration: " + scan, connection));
         }
         
-        ResultIterator iterator = newIterator();
+        ResultIterator iterator = newIterator(scanGrouper);
         iterator = dependencies.isEmpty() ?
                 iterator : new DelegateResultIterator(iterator) {
             @Override
@@ -361,7 +367,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         }
     }
 
-    abstract protected ResultIterator newIterator() throws SQLException;
+    abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException;
     
     @Override
     public long getEstimatedSize() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 30adbe9..3df0447 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -48,6 +48,7 @@ import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
@@ -80,8 +81,8 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
     }
 
     @Override
-    public ResultIterator iterator() throws SQLException {
-        ResultIterator iterator = delegate.iterator();
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        ResultIterator iterator = delegate.iterator(scanGrouper);
         if (where != null) {
             iterator = new FilterResultIterator(iterator, where);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
index 8e787b4..b189933 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
@@ -17,11 +17,15 @@
  */
 package org.apache.phoenix.execute;
 
+import java.sql.SQLException;
+
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.schema.TableRef;
 
@@ -79,4 +83,9 @@ public abstract class ClientProcessingPlan extends DelegateQueryPlan {
     public FilterableStatement getStatement() {
         return statement;
     }
+    
+    @Override
+    public ResultIterator iterator() throws SQLException {
+        return iterator(DefaultParallelScanGrouper.getInstance());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 01fbd11..4bf1889 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -49,8 +50,8 @@ public class ClientScanPlan extends ClientProcessingPlan {
     }
 
     @Override
-    public ResultIterator iterator() throws SQLException {
-        ResultIterator iterator = delegate.iterator();
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        ResultIterator iterator = delegate.iterator(scanGrouper);
         if (where != null) {
             iterator = new FilterResultIterator(iterator, where);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index fda53ea..98eb2dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -27,6 +27,7 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -51,7 +52,7 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator() throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 57fa25a..05ef1ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -49,7 +49,9 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.FilterResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
@@ -113,9 +115,14 @@ public class HashJoinPlan extends DelegateQueryPlan {
         this.subPlans = subPlans;
         this.recompileWhereClause = recompileWhereClause;
     }
-
+    
     @Override
     public ResultIterator iterator() throws SQLException {
+    	return iterator(DefaultParallelScanGrouper.getInstance());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
         int count = subPlans.length;
         PhoenixConnection connection = getContext().getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
@@ -191,7 +198,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
             HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
         }
         
-        ResultIterator iterator = joinInfo == null ? delegate.iterator() : ((BaseQueryPlan) delegate).iterator(dependencies);
+        ResultIterator iterator = joinInfo == null ? delegate.iterator(scanGrouper) : ((BaseQueryPlan) delegate).iterator(dependencies, scanGrouper);
         if (statement.getInnerSelectStatement() != null && postFilter != null) {
             iterator = new FilterResultIterator(iterator, postFilter);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 884d835..b9dd2f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -38,6 +38,7 @@ import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
 import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.ResultIterators;
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -161,7 +162,7 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator() throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be too late afterwards
     	Scan scan = context.getScan();
         scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
@@ -177,9 +178,9 @@ public class ScanPlan extends BaseQueryPlan {
         Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
         ResultIterators iterators;
         if (isSerial) {
-        	iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory);
+        	iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper);
         } else {
-        	iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory);
+        	iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper);
         }
         splits = iterators.getSplits();
         scans = iterators.getScans();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 46ade33..1bbda07 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -44,7 +44,9 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.MappedByteBufferQueue;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -114,10 +116,15 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     @Override
-    public ResultIterator iterator() throws SQLException {        
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {        
         return type == JoinType.Semi || type == JoinType.Anti ? 
-                new SemiAntiJoinIterator(lhsPlan.iterator(), rhsPlan.iterator()) :
-                new BasicJoinIterator(lhsPlan.iterator(), rhsPlan.iterator());
+                new SemiAntiJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)) :
+                new BasicJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper));
+    }
+    
+    @Override
+    public ResultIterator iterator() throws SQLException {        
+        return iterator(DefaultParallelScanGrouper.getInstance());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index c9cbd15..e8d9af0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -23,8 +23,10 @@ import java.util.List;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.FilterResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.schema.tuple.Tuple;
 
@@ -50,10 +52,15 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
 
         return new ExplainPlan(planSteps);
     }
-
+    
     @Override
     public ResultIterator iterator() throws SQLException {
-        ResultIterator iterator = new DelegateResultIterator(delegate.iterator()) {
+    	return iterator(DefaultParallelScanGrouper.getInstance());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        ResultIterator iterator = new DelegateResultIterator(delegate.iterator(scanGrouper)) {
             
             @Override
             public Tuple next() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 2bed3a0..53745fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.UnionResultIterators;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -123,6 +124,11 @@ public class UnionPlan implements QueryPlan {
     }
 
     @Override
+    public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        return iterator(Collections.<SQLCloseable>emptyList());
+    }
+    
+    @Override
     public final ResultIterator iterator() throws SQLException {
         return iterator(Collections.<SQLCloseable>emptyList());
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 43731cb..cf66d93 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -103,6 +103,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     private final byte[] physicalTableName;
     private final QueryPlan plan;
     protected final String scanId;
+    private final ParallelScanGrouper scanGrouper;
     // TODO: too much nesting here - breakup into new classes.
     private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
     
@@ -133,9 +134,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return true;
     }
     
-    public BaseResultIterators(QueryPlan plan, Integer perScanLimit) throws SQLException {
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, ParallelScanGrouper scanGrouper) throws SQLException {
         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), plan.getLimit());
         this.plan = plan;
+        this.scanGrouper = scanGrouper;
         StatementContext context = plan.getContext();
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
@@ -371,24 +373,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     }
     
     private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) {
-        PTable table = getTable();
-        boolean startNewScanList = false;
-        if (!plan.isRowKeyOrdered()) {
-            startNewScanList = true;
-        } else if (crossedRegionBoundary) {
-            if (table.getIndexType() == IndexType.LOCAL) {
-                startNewScanList = true;
-            } else if (table.getBucketNum() != null) {
-                startNewScanList = scans.isEmpty() ||
-                        ScanUtil.crossesPrefixBoundary(startKey,
-                                ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES), 
-                                SaltingUtil.NUM_SALTING_BYTES);
-            }
-        }
+        boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary);
         if (scan != null) {
-            scans.add(scan);
+        	scans.add(scan);
         }
-        if (startNewScanList && !scans.isEmpty()) {
+        if (startNewScan && !scans.isEmpty()) {
             parallelScans.add(scans);
             scans = Lists.newArrayListWithExpectedSize(1);
         }
@@ -410,7 +399,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         Scan scan = context.getScan();
         List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
                 .getAllTableRegions(physicalTableName);
-        
         List<byte[]> regionBoundaries = toBoundaries(regionLocations);
         ScanRanges scanRanges = context.getScanRanges();
         PTable table = getTable();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
new file mode 100644
index 0000000..5c7136f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.util.ScanUtil;
+
+/**
+ * Default implementation that creates a scan group if a plan is row key ordered (which requires a merge sort),
+ * or if a scan crosses a region boundary and the table is salted or a local index.   
+ */
+public class DefaultParallelScanGrouper implements ParallelScanGrouper {
+	
+	private static final DefaultParallelScanGrouper INSTANCE = new DefaultParallelScanGrouper();
+
+    public static DefaultParallelScanGrouper getInstance() {
+        return INSTANCE;
+    }
+    
+    private DefaultParallelScanGrouper() {}
+
+	@Override
+	public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, byte[] startKey, boolean crossedRegionBoundary) {
+		PTable table = plan.getTableRef().getTable();
+		boolean startNewScanGroup = false;
+        if (!plan.isRowKeyOrdered()) {
+            startNewScanGroup = true;
+        } else if (crossedRegionBoundary) {
+            if (table.getIndexType() == IndexType.LOCAL) {
+                startNewScanGroup = true;
+            } else if (table.getBucketNum() != null) {
+                startNewScanGroup = scans.isEmpty() ||
+                        ScanUtil.crossesPrefixBoundary(startKey,
+                                ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES), 
+                                SaltingUtil.NUM_SALTING_BYTES);
+            }
+        }
+        return startNewScanGroup;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
new file mode 100644
index 0000000..bf2666d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+
+/**
+ * Scan grouper that creates a scan group if a plan is row key ordered or if a
+ * scan crosses region boundaries
+ */
+public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
+	
+	private static final MapReduceParallelScanGrouper INSTANCE = new MapReduceParallelScanGrouper();
+
+    public static MapReduceParallelScanGrouper getInstance() {
+        return INSTANCE;
+    }
+    
+    private MapReduceParallelScanGrouper() {}
+
+	@Override
+	public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans,
+			byte[] startKey, boolean crossedRegionBoundary) {
+		return !plan.isRowKeyOrdered() || crossedRegionBoundary;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 2dfbfe3..87f8335 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -54,11 +54,16 @@ public class ParallelIterators extends BaseResultIterators {
 	private static final String NAME = "PARALLEL";
     private final ParallelIteratorFactory iteratorFactory;
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
             throws SQLException {
-        super(plan, perScanLimit);
+        super(plan, perScanLimit, scanGrouper);
         this.iteratorFactory = iteratorFactory;
     }   
+    
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+            throws SQLException {
+        this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance());
+    }  
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
new file mode 100644
index 0000000..0becf4f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+
+/**
+ * Interface for a parallel scan grouper
+ */
+public interface ParallelScanGrouper {
+
+	/**
+	 * Determines whether to create a new group of parallel scans.
+	 * 	
+	 * @param scans						current scan group
+	 * @param plan						current query plan
+	 * @param startKey					start key of scan
+	 * @param crossedRegionBoundary		whether we crossed a region boundary
+	 * @return true if we should create a new group of scans
+	 */
+	boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, byte[] startKey, boolean crossedRegionBoundary);
+	
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 516d73e..fa18c83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -51,9 +51,9 @@ public class SerialIterators extends BaseResultIterators {
 	private static final String NAME = "SERIAL";
     private final ParallelIteratorFactory iteratorFactory;
     
-    public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+    public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
             throws SQLException {
-        super(plan, perScanLimit);
+        super(plan, perScanLimit, scanGrouper);
         Preconditions.checkArgument(perScanLimit != null); // must be a limit specified
         this.iteratorFactory = iteratorFactory;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index c6c5b0c..2bb3b92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -71,6 +71,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AliasedNode;
@@ -446,6 +447,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public ResultIterator iterator() throws SQLException {
                     return iterator;
                 }
+                
+                @Override
+                public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+                    return iterator;
+                }
 
                 @Override
                 public long getEstimatedSize() {
@@ -511,6 +517,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public boolean useRoundRobinIterator() throws SQLException {
                     return false;
                 }
+                
             };
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 31759b4..8ee1634 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,6 +37,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -112,10 +114,10 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
             Preconditions.checkNotNull(selectStatement);
             final Statement statement = connection.createStatement();
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
-            // Optimize the query plan so that we potentially use secondary indexes
+            // Optimize the query plan so that we potentially use secondary indexes            
             final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
             // Initialize the query plan so it sets up the parallel scans
-            queryPlan.iterator();
+            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
             return queryPlan;
         } catch (Exception exception) {
             LOG.error(String.format("Failed to get the query plan with error [%s]",

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index b222fc9..caee3cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index ecb088a..ad65373 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -368,6 +369,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
             }
 
             @Override
+            public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+                return ResultIterator.EMPTY_ITERATOR;
+            }
+            
+            @Override
             public ResultIterator iterator() throws SQLException {
                 return ResultIterator.EMPTY_ITERATOR;
             }