You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2015/07/01 09:43:36 UTC

[1/3] phoenix git commit: PHOENIX-2085 Include joda-time in phoenix server jar

Repository: phoenix
Updated Branches:
  refs/heads/master 72a7356bc -> 54da7d1d6


PHOENIX-2085 Include joda-time in phoenix server jar


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d2392bea
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d2392bea
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d2392bea

Branch: refs/heads/master
Commit: d2392beae3318099685856ffd18825028f21a7d1
Parents: 72a7356
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Mon Jun 29 13:21:30 2015 -0700
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Tue Jun 30 22:10:33 2015 -0700

----------------------------------------------------------------------
 phoenix-assembly/src/build/server.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d2392bea/phoenix-assembly/src/build/server.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/server.xml b/phoenix-assembly/src/build/server.xml
index 12d3d81..78a4b1f 100644
--- a/phoenix-assembly/src/build/server.xml
+++ b/phoenix-assembly/src/build/server.xml
@@ -38,6 +38,7 @@
         <include>org.iq80.snappy:snappy</include>
         <include>org.jruby.joni:joni</include>
         <include>org.jruby.jcodings:jcodings</include>
+	<include>joda-time:joda-time</include>
       </includes>
     </dependencySet>
     <dependencySet>


[3/3] phoenix git commit: PHOENIX-2059 MR index build does not handle table with a schema name correctly

Posted by td...@apache.org.
PHOENIX-2059 MR index build does not handle table with a schema name correctly


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/54da7d1d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/54da7d1d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/54da7d1d

Branch: refs/heads/master
Commit: 54da7d1d6b2ecd27c8c98211e84484029b6d39c2
Parents: 6a07d45
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Mon Jun 22 17:45:58 2015 -0700
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Tue Jun 30 22:21:37 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/mapreduce/IndexToolIT.java   | 47 ++++++++++++--------
 .../phoenix/mapreduce/index/IndexTool.java      | 15 ++++---
 2 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/54da7d1d/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
index 6761275..5d11cf2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -62,6 +63,7 @@ public class IndexToolIT {
     public static void setUp() throws Exception {
         hbaseTestUtil = new HBaseTestingUtility();
         Configuration conf = hbaseTestUtil.getConfiguration();
+        conf.setBoolean("hbase.defaults.for.version.skip", true);
         setUpConfigForMiniCluster(conf);
         hbaseTestUtil.startMiniCluster();
         hbaseTestUtil.startMiniMapReduceCluster();
@@ -71,34 +73,35 @@ public class IndexToolIT {
     
     @Test
     public void testImmutableGlobalIndex() throws Exception {
-        testSecondaryIndex("DATA_TABLE1",true, false);
+        testSecondaryIndex("SCHEMA", "DATA_TABLE1", true, false);
     }
     
     @Test
     public void testImmutableLocalIndex() throws Exception {
-        testSecondaryIndex("DATA_TABLE2",true, true);
+        testSecondaryIndex("SCHEMA", "DATA_TABLE2", true, true);
     }
     
     @Test
     public void testMutableGlobalIndex() throws Exception {
-        testSecondaryIndex("DATA_TABLE3",false, false);
+        testSecondaryIndex("SCHEMA", "DATA_TABLE3", false, false);
     }
     
     @Test
     public void testMutableLocalIndex() throws Exception {
-        testSecondaryIndex("DATA_TABLE4",false, true);
+        testSecondaryIndex("SCHEMA", "DATA_TABLE4", false, true);
     }
     
-    public void testSecondaryIndex(final String dataTable , final boolean isImmutable , final boolean isLocal) throws Exception {
+    public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal) throws Exception {
         
+    	final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable);
         final String indxTable = String.format("%s_%s",dataTable,"INDX");
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
         Statement stmt = conn.createStatement();
         try {
         
-            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s",dataTable, (isImmutable ? "IMMUTABLE_ROWS=true" :"")));
-            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
+            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s", fullTableName, (isImmutable ? "IMMUTABLE_ROWS=true" :"")));
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
             PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
             
             int id = 1;
@@ -107,15 +110,15 @@ public class IndexToolIT {
             upsertRow(stmt1, id++);
             conn.commit();
             
-            stmt.execute(String.format("CREATE %s INDEX %s ON %s (UPPER(NAME)) ASYNC ", (isLocal ? "LOCAL" : ""), indxTable,dataTable));
+            stmt.execute(String.format("CREATE %s INDEX %s ON %s  (UPPER(NAME)) ASYNC ", (isLocal ? "LOCAL" : ""), indxTable, fullTableName));
    
             //verify rows are fetched from data table.
-            String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
+            String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s", fullTableName);
             ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
             String actualExplainPlan = QueryUtil.getExplainPlan(rs);
             
             //assert we are pulling from data table.
-            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
+            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s", fullTableName), actualExplainPlan);
             
             rs = stmt1.executeQuery(selectSql);
             assertTrue(rs.next());
@@ -127,7 +130,7 @@ public class IndexToolIT {
             final IndexTool indexingTool = new IndexTool();
             indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
             
-            final String[] cmdArgs = getArgValues(dataTable,indxTable);
+            final String[] cmdArgs = getArgValues(schemaName, dataTable, indxTable);
             int status = indexingTool.run(cmdArgs);
             assertEquals(0, status);
             
@@ -135,11 +138,13 @@ public class IndexToolIT {
             upsertRow(stmt1, 3);
             upsertRow(stmt1, 4);
             conn.commit();
+            
+            rs = stmt1.executeQuery("SELECT * FROM "+SchemaUtil.getTableName(schemaName, indxTable));
 
             //assert we are pulling from index table.
             rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
             actualExplainPlan = QueryUtil.getExplainPlan(rs);
-            assertExplainPlan(actualExplainPlan,dataTable,indxTable,isLocal);
+            assertExplainPlan(actualExplainPlan,schemaName,dataTable,indxTable,isLocal);
             
             rs = stmt.executeQuery(selectSql);
             assertTrue(rs.next());
@@ -160,7 +165,7 @@ public class IndexToolIT {
       
             assertFalse(rs.next());
             
-            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , dataTable));
+            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , fullTableName));
         } finally {
             conn.close();
         }
@@ -219,14 +224,14 @@ public class IndexToolIT {
             final IndexTool indexingTool = new IndexTool();
             indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
             
-            final String[] cmdArgs = getArgValues(dataTable,indxTable);
+            final String[] cmdArgs = getArgValues(null, dataTable,indxTable);
             int status = indexingTool.run(cmdArgs);
             assertEquals(0, status);
             
             //assert we are pulling from index table.
             rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
             actualExplainPlan = QueryUtil.getExplainPlan(rs);
-            assertExplainPlan(actualExplainPlan,dataTable,indxTable,false);
+            assertExplainPlan(actualExplainPlan,null,dataTable,indxTable,false);
             
             rs = stmt.executeQuery(selectSql);
             assertTrue(rs.next());
@@ -242,23 +247,27 @@ public class IndexToolIT {
         }
     }
     
-    private void assertExplainPlan(final String actualExplainPlan, String dataTable,
+    private void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
             String indxTable, boolean isLocal) {
         
         String expectedExplainPlan = "";
         if(isLocal) {
-            final String localIndexName = MetaDataUtil.getLocalIndexTableName(dataTable);
+            final String localIndexName = MetaDataUtil.getLocalIndexTableName(SchemaUtil.getTableName(schemaName, dataTable));
             expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]"
                 + "\n    SERVER FILTER BY FIRST KEY ONLY", localIndexName);
         } else {
             expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s"
-                    + "\n    SERVER FILTER BY FIRST KEY ONLY",indxTable);
+                    + "\n    SERVER FILTER BY FIRST KEY ONLY",SchemaUtil.getTableName(schemaName, indxTable));
         }
         assertEquals(expectedExplainPlan,actualExplainPlan);
     }
 
-    private String[] getArgValues(String dataTable, String indxTable) {
+    private String[] getArgValues(String schemaName, String dataTable, String indxTable) {
         final List<String> args = Lists.newArrayList();
+        if (schemaName!=null) {
+        	args.add("-s");
+        	args.add(schemaName);
+        }
         args.add("-dt");
         args.add(dataTable);
         args.add("-it");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54da7d1d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 300f575..d3a1adf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -30,6 +30,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -164,12 +165,12 @@ public class IndexTool extends Configured implements Tool {
             final String qIndexTable = SchemaUtil.getTableName(schemaName, indexTable);
          
             connection = ConnectionUtil.getInputConnection(configuration);
-            if(!isValidIndexTable(connection, dataTable, indexTable)) {
+            if(!isValidIndexTable(connection, qDataTable, indexTable)) {
                 throw new IllegalArgumentException(String.format(" %s is not an index table for %s ",qIndexTable,qDataTable));
             }
             
-            final PTable pdataTable = PhoenixRuntime.getTable(connection, dataTable);
-            final PTable pindexTable = PhoenixRuntime.getTable(connection, indexTable);
+            final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
+            final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
             
             // this is set to ensure index tables remains consistent post population.
             long indxTimestamp = pindexTable.getTimeStamp();
@@ -178,7 +179,7 @@ public class IndexTool extends Configured implements Tool {
             // check if the index type is LOCAL, if so, set the logicalIndexName that is computed from the dataTable name.
             String logicalIndexTable = qIndexTable;
             if(IndexType.LOCAL.equals(pindexTable.getIndexType())) {
-                logicalIndexTable  = MetaDataUtil.getLocalIndexTableName(dataTable);
+                logicalIndexTable  = MetaDataUtil.getLocalIndexTableName(qDataTable);
             }
             
             final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
@@ -187,7 +188,7 @@ public class IndexTool extends Configured implements Tool {
             
             final List<String> indexColumns = ddlCompiler.getIndexColumnNames();
             final String selectQuery = ddlCompiler.getSelectQuery();
-            final String upsertQuery = QueryUtil.constructUpsertStatement(indexTable, indexColumns, Hint.NO_INDEX);
+            final String upsertQuery = QueryUtil.constructUpsertStatement(qIndexTable, indexColumns, Hint.NO_INDEX);
        
             configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
             PhoenixConfigurationUtil.setOutputTableName(configuration, logicalIndexTable);
@@ -231,11 +232,11 @@ public class IndexTool extends Configured implements Tool {
             }
             
             // finally update the index state to ACTIVE.
-            updateIndexState(connection,dataTable,indexTable,PIndexState.ACTIVE);
+            updateIndexState(connection,qDataTable,indexTable,PIndexState.ACTIVE);
             return 0;
             
         } catch (Exception ex) {
-           LOG.error(" An exception occured while performing the indexing job , error message {} ",ex.getMessage());
+           LOG.error(" An exception occured while performing the indexing job : "+ ExceptionUtils.getStackTrace(ex));
            return -1;
         } finally {
             try {


[2/3] phoenix git commit: PHOENIX-2075 MR integration uses single mapper unless table is salted

Posted by td...@apache.org.
PHOENIX-2075 MR integration uses single mapper unless table is salted


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6a07d45a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6a07d45a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6a07d45a

Branch: refs/heads/master
Commit: 6a07d45a76d6c07666b777676627ebc313d0d7b5
Parents: d2392be
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Fri Jun 26 14:54:31 2015 -0700
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Tue Jun 30 22:13:59 2015 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/compile/QueryPlan.java   |  3 +
 .../apache/phoenix/compile/TraceQueryPlan.java  |  9 ++-
 .../apache/phoenix/execute/AggregatePlan.java   |  3 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   | 14 +++--
 .../phoenix/execute/ClientAggregatePlan.java    |  5 +-
 .../phoenix/execute/ClientProcessingPlan.java   |  9 +++
 .../apache/phoenix/execute/ClientScanPlan.java  |  5 +-
 .../phoenix/execute/DegenerateQueryPlan.java    |  3 +-
 .../apache/phoenix/execute/HashJoinPlan.java    | 11 +++-
 .../org/apache/phoenix/execute/ScanPlan.java    |  7 ++-
 .../phoenix/execute/SortMergeJoinPlan.java      | 13 +++-
 .../phoenix/execute/TupleProjectionPlan.java    | 11 +++-
 .../org/apache/phoenix/execute/UnionPlan.java   |  6 ++
 .../phoenix/iterate/BaseResultIterators.java    | 24 ++------
 .../iterate/DefaultParallelScanGrouper.java     | 62 ++++++++++++++++++++
 .../iterate/MapReduceParallelScanGrouper.java   | 45 ++++++++++++++
 .../phoenix/iterate/ParallelIterators.java      |  9 ++-
 .../phoenix/iterate/ParallelScanGrouper.java    | 41 +++++++++++++
 .../apache/phoenix/iterate/SerialIterators.java |  4 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  7 +++
 .../phoenix/mapreduce/PhoenixInputFormat.java   |  6 +-
 .../phoenix/mapreduce/PhoenixInputSplit.java    |  1 +
 .../query/ParallelIteratorsSplitTest.java       |  6 ++
 23 files changed, 259 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index d0c63fa..1c0c469 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
@@ -46,6 +47,8 @@ public interface QueryPlan extends StatementPlan {
      */
     public ResultIterator iterator() throws SQLException;
     
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException;
+    
     public long getEstimatedSize();
     
     // TODO: change once joins are supported

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 11377de..93a2da0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -38,6 +38,8 @@ import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
@@ -105,9 +107,14 @@ public class TraceQueryPlan implements QueryPlan {
     public ExplainPlan getExplainPlan() throws SQLException {
         return ExplainPlan.EMPTY_PLAN;
     }
-
+    
     @Override
     public ResultIterator iterator() throws SQLException {
+    	return iterator(DefaultParallelScanGrouper.getInstance());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
         final PhoenixConnection conn = stmt.getConnection();
         if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) {
             return ResultIterator.EMPTY_ITERATOR;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 00e843d..67222d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -45,6 +45,7 @@ import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
@@ -141,7 +142,7 @@ public class AggregatePlan extends BaseQueryPlan {
     }
     
     @Override
-    protected ResultIterator newIterator() throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
         if (groupBy.isEmpty()) {
             UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan());
         } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 8b6de1d..37b73c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -46,8 +46,10 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -155,11 +157,15 @@ public abstract class BaseQueryPlan implements QueryPlan {
 //    }
     
     @Override
+    public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper);
+    }
+    
     public final ResultIterator iterator() throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList());
+        return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance());
     }
 
-    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException {
+    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper) throws SQLException {
         if (context.getScanRanges() == ScanRanges.NOTHING) {
             return ResultIterator.EMPTY_ITERATOR;
         }
@@ -235,7 +241,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         	LOG.debug(LogUtil.addCustomAnnotations("Scan ready for iteration: " + scan, connection));
         }
         
-        ResultIterator iterator = newIterator();
+        ResultIterator iterator = newIterator(scanGrouper);
         iterator = dependencies.isEmpty() ?
                 iterator : new DelegateResultIterator(iterator) {
             @Override
@@ -361,7 +367,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         }
     }
 
-    abstract protected ResultIterator newIterator() throws SQLException;
+    abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException;
     
     @Override
     public long getEstimatedSize() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 30adbe9..3df0447 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -48,6 +48,7 @@ import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
@@ -80,8 +81,8 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
     }
 
     @Override
-    public ResultIterator iterator() throws SQLException {
-        ResultIterator iterator = delegate.iterator();
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        ResultIterator iterator = delegate.iterator(scanGrouper);
         if (where != null) {
             iterator = new FilterResultIterator(iterator, where);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
index 8e787b4..b189933 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
@@ -17,11 +17,15 @@
  */
 package org.apache.phoenix.execute;
 
+import java.sql.SQLException;
+
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.schema.TableRef;
 
@@ -79,4 +83,9 @@ public abstract class ClientProcessingPlan extends DelegateQueryPlan {
     public FilterableStatement getStatement() {
         return statement;
     }
+    
+    @Override
+    public ResultIterator iterator() throws SQLException {
+        return iterator(DefaultParallelScanGrouper.getInstance());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 01fbd11..4bf1889 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -49,8 +50,8 @@ public class ClientScanPlan extends ClientProcessingPlan {
     }
 
     @Override
-    public ResultIterator iterator() throws SQLException {
-        ResultIterator iterator = delegate.iterator();
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        ResultIterator iterator = delegate.iterator(scanGrouper);
         if (where != null) {
             iterator = new FilterResultIterator(iterator, where);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index fda53ea..98eb2dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -27,6 +27,7 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -51,7 +52,7 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator() throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 57fa25a..05ef1ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -49,7 +49,9 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.FilterResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
@@ -113,9 +115,14 @@ public class HashJoinPlan extends DelegateQueryPlan {
         this.subPlans = subPlans;
         this.recompileWhereClause = recompileWhereClause;
     }
-
+    
     @Override
     public ResultIterator iterator() throws SQLException {
+    	return iterator(DefaultParallelScanGrouper.getInstance());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
         int count = subPlans.length;
         PhoenixConnection connection = getContext().getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
@@ -191,7 +198,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
             HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
         }
         
-        ResultIterator iterator = joinInfo == null ? delegate.iterator() : ((BaseQueryPlan) delegate).iterator(dependencies);
+        ResultIterator iterator = joinInfo == null ? delegate.iterator(scanGrouper) : ((BaseQueryPlan) delegate).iterator(dependencies, scanGrouper);
         if (statement.getInnerSelectStatement() != null && postFilter != null) {
             iterator = new FilterResultIterator(iterator, postFilter);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 884d835..b9dd2f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -38,6 +38,7 @@ import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
 import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.ResultIterators;
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -161,7 +162,7 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator() throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be too late afterwards
     	Scan scan = context.getScan();
         scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
@@ -177,9 +178,9 @@ public class ScanPlan extends BaseQueryPlan {
         Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
         ResultIterators iterators;
         if (isSerial) {
-        	iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory);
+        	iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper);
         } else {
-        	iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory);
+        	iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper);
         }
         splits = iterators.getSplits();
         scans = iterators.getScans();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 46ade33..1bbda07 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -44,7 +44,9 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.MappedByteBufferQueue;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -114,10 +116,15 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     @Override
-    public ResultIterator iterator() throws SQLException {        
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {        
         return type == JoinType.Semi || type == JoinType.Anti ? 
-                new SemiAntiJoinIterator(lhsPlan.iterator(), rhsPlan.iterator()) :
-                new BasicJoinIterator(lhsPlan.iterator(), rhsPlan.iterator());
+                new SemiAntiJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)) :
+                new BasicJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper));
+    }
+    
+    @Override
+    public ResultIterator iterator() throws SQLException {        
+        return iterator(DefaultParallelScanGrouper.getInstance());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index c9cbd15..e8d9af0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -23,8 +23,10 @@ import java.util.List;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.FilterResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.schema.tuple.Tuple;
 
@@ -50,10 +52,15 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
 
         return new ExplainPlan(planSteps);
     }
-
+    
     @Override
     public ResultIterator iterator() throws SQLException {
-        ResultIterator iterator = new DelegateResultIterator(delegate.iterator()) {
+    	return iterator(DefaultParallelScanGrouper.getInstance());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        ResultIterator iterator = new DelegateResultIterator(delegate.iterator(scanGrouper)) {
             
             @Override
             public Tuple next() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 2bed3a0..53745fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.UnionResultIterators;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -123,6 +124,11 @@ public class UnionPlan implements QueryPlan {
     }
 
     @Override
+    public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        return iterator(Collections.<SQLCloseable>emptyList());
+    }
+    
+    @Override
     public final ResultIterator iterator() throws SQLException {
         return iterator(Collections.<SQLCloseable>emptyList());
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 43731cb..cf66d93 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -103,6 +103,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     private final byte[] physicalTableName;
     private final QueryPlan plan;
     protected final String scanId;
+    private final ParallelScanGrouper scanGrouper;
     // TODO: too much nesting here - breakup into new classes.
     private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
     
@@ -133,9 +134,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return true;
     }
     
-    public BaseResultIterators(QueryPlan plan, Integer perScanLimit) throws SQLException {
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, ParallelScanGrouper scanGrouper) throws SQLException {
         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), plan.getLimit());
         this.plan = plan;
+        this.scanGrouper = scanGrouper;
         StatementContext context = plan.getContext();
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
@@ -371,24 +373,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     }
     
     private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) {
-        PTable table = getTable();
-        boolean startNewScanList = false;
-        if (!plan.isRowKeyOrdered()) {
-            startNewScanList = true;
-        } else if (crossedRegionBoundary) {
-            if (table.getIndexType() == IndexType.LOCAL) {
-                startNewScanList = true;
-            } else if (table.getBucketNum() != null) {
-                startNewScanList = scans.isEmpty() ||
-                        ScanUtil.crossesPrefixBoundary(startKey,
-                                ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES), 
-                                SaltingUtil.NUM_SALTING_BYTES);
-            }
-        }
+        boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary);
         if (scan != null) {
-            scans.add(scan);
+        	scans.add(scan);
         }
-        if (startNewScanList && !scans.isEmpty()) {
+        if (startNewScan && !scans.isEmpty()) {
             parallelScans.add(scans);
             scans = Lists.newArrayListWithExpectedSize(1);
         }
@@ -410,7 +399,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         Scan scan = context.getScan();
         List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
                 .getAllTableRegions(physicalTableName);
-        
         List<byte[]> regionBoundaries = toBoundaries(regionLocations);
         ScanRanges scanRanges = context.getScanRanges();
         PTable table = getTable();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
new file mode 100644
index 0000000..5c7136f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.util.ScanUtil;
+
+/**
+ * Default implementation that creates a scan group if a plan is row key ordered (which requires a merge sort),
+ * or if a scan crosses a region boundary and the table is salted or a local index.   
+ */
+public class DefaultParallelScanGrouper implements ParallelScanGrouper {
+	
+	private static final DefaultParallelScanGrouper INSTANCE = new DefaultParallelScanGrouper();
+
+    public static DefaultParallelScanGrouper getInstance() {
+        return INSTANCE;
+    }
+    
+    private DefaultParallelScanGrouper() {}
+
+	@Override
+	public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, byte[] startKey, boolean crossedRegionBoundary) {
+		PTable table = plan.getTableRef().getTable();
+		boolean startNewScanGroup = false;
+        if (!plan.isRowKeyOrdered()) {
+            startNewScanGroup = true;
+        } else if (crossedRegionBoundary) {
+            if (table.getIndexType() == IndexType.LOCAL) {
+                startNewScanGroup = true;
+            } else if (table.getBucketNum() != null) {
+                startNewScanGroup = scans.isEmpty() ||
+                        ScanUtil.crossesPrefixBoundary(startKey,
+                                ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES), 
+                                SaltingUtil.NUM_SALTING_BYTES);
+            }
+        }
+        return startNewScanGroup;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
new file mode 100644
index 0000000..bf2666d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+
+/**
+ * Scan grouper that creates a scan group if a plan is row key ordered or if a
+ * scan crosses region boundaries
+ */
+public class MapReduceParallelScanGrouper implements ParallelScanGrouper {
+	
+	private static final MapReduceParallelScanGrouper INSTANCE = new MapReduceParallelScanGrouper();
+
+    public static MapReduceParallelScanGrouper getInstance() {
+        return INSTANCE;
+    }
+    
+    private MapReduceParallelScanGrouper() {}
+
+	@Override
+	public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans,
+			byte[] startKey, boolean crossedRegionBoundary) {
+		return !plan.isRowKeyOrdered() || crossedRegionBoundary;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 2dfbfe3..87f8335 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -54,11 +54,16 @@ public class ParallelIterators extends BaseResultIterators {
 	private static final String NAME = "PARALLEL";
     private final ParallelIteratorFactory iteratorFactory;
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
             throws SQLException {
-        super(plan, perScanLimit);
+        super(plan, perScanLimit, scanGrouper);
         this.iteratorFactory = iteratorFactory;
     }   
+    
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+            throws SQLException {
+        this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance());
+    }  
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
new file mode 100644
index 0000000..0becf4f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScanGrouper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+
+/**
+ * Interface for a parallel scan grouper
+ */
+public interface ParallelScanGrouper {
+
+	/**
+	 * Determines whether to create a new group of parallel scans.
+	 * 	
+	 * @param scans						current scan group
+	 * @param plan						current query plan
+	 * @param startKey					start key of scan
+	 * @param crossedRegionBoundary		whether we crossed a region boundary
+	 * @return true if we should create a new group of scans
+	 */
+	boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, byte[] startKey, boolean crossedRegionBoundary);
+	
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 516d73e..fa18c83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -51,9 +51,9 @@ public class SerialIterators extends BaseResultIterators {
 	private static final String NAME = "SERIAL";
     private final ParallelIteratorFactory iteratorFactory;
     
-    public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+    public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
             throws SQLException {
-        super(plan, perScanLimit);
+        super(plan, perScanLimit, scanGrouper);
         Preconditions.checkArgument(perScanLimit != null); // must be a limit specified
         this.iteratorFactory = iteratorFactory;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index c6c5b0c..2bb3b92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -71,6 +71,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AliasedNode;
@@ -446,6 +447,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public ResultIterator iterator() throws SQLException {
                     return iterator;
                 }
+                
+                @Override
+                public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+                    return iterator;
+                }
 
                 @Override
                 public long getEstimatedSize() {
@@ -511,6 +517,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public boolean useRoundRobinIterator() throws SQLException {
                     return false;
                 }
+                
             };
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 31759b4..8ee1634 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,6 +37,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -112,10 +114,10 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
             Preconditions.checkNotNull(selectStatement);
             final Statement statement = connection.createStatement();
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
-            // Optimize the query plan so that we potentially use secondary indexes
+            // Optimize the query plan so that we potentially use secondary indexes            
             final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
             // Initialize the query plan so it sets up the parallel scans
-            queryPlan.iterator();
+            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
             return queryPlan;
         } catch (Exception exception) {
             LOG.error(String.format("Failed to get the query plan with error [%s]",

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index b222fc9..caee3cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a07d45a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index ecb088a..ad65373 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.iterate.ParallelIterators;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -368,6 +369,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
             }
 
             @Override
+            public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+                return ResultIterator.EMPTY_ITERATOR;
+            }
+            
+            @Override
             public ResultIterator iterator() throws SQLException {
                 return ResultIterator.EMPTY_ITERATOR;
             }