You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/13 02:17:08 UTC

[3/7] phoenix git commit: PHOENIX-1432 Run limit query that has only leading PK column filter serially

PHOENIX-1432 Run limit query that has only leading PK column filter serially


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e77cded0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e77cded0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e77cded0

Branch: refs/heads/4.2
Commit: e77cded06a8b137a3c99b5cbc6982058119437c9
Parents: 05d23cb
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Nov 12 10:02:20 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 12 17:03:57 2014 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/HashJoinIT.java  |   6 +-
 .../org/apache/phoenix/end2end/KeyOnlyIT.java   |  49 +-
 .../phoenix/end2end/StatsCollectorIT.java       |  16 +-
 .../MutatingParallelIteratorFactory.java        |   2 +-
 .../apache/phoenix/compile/QueryCompiler.java   |   2 +-
 .../coprocessor/MetaDataEndpointImpl.java       |   4 +-
 .../apache/phoenix/execute/AggregatePlan.java   |   2 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |   2 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |  47 +-
 .../phoenix/iterate/BaseResultIterators.java    | 622 +++++++++++++++++++
 .../phoenix/iterate/ChunkedResultIterator.java  |  10 +-
 .../phoenix/iterate/ConcatResultIterator.java   |  37 +-
 .../phoenix/iterate/DelegateResultIterator.java |   4 +
 .../iterate/LimitingPeekingResultIterator.java  |  47 ++
 .../iterate/ParallelIteratorFactory.java        |  27 +
 .../phoenix/iterate/ParallelIterators.java      | 580 +----------------
 .../apache/phoenix/iterate/ResultIterators.java |   7 +-
 .../apache/phoenix/iterate/SerialIterators.java | 115 ++++
 .../phoenix/iterate/SpoolingResultIterator.java |   1 -
 .../apache/phoenix/optimize/QueryOptimizer.java |   2 +-
 .../phoenix/query/QueryServicesOptions.java     |   1 +
 .../schema/stats/StatisticsCollector.java       |  23 +-
 .../phoenix/schema/stats/StatisticsUtil.java    |  17 +
 .../phoenix/schema/stats/StatisticsWriter.java  |  30 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |  24 +
 .../org/apache/phoenix/util/ServerUtil.java     |  34 +-
 .../iterate/AggregateResultScannerTest.java     |  11 +
 .../iterate/ConcatResultIteratorTest.java       |  21 +-
 .../iterate/MergeSortResultIteratorTest.java    |  12 +-
 .../org/apache/phoenix/query/QueryPlanTest.java |  51 ++
 .../phoenix/pig/hadoop/PhoenixRecordReader.java |   2 +-
 31 files changed, 1145 insertions(+), 663 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 3850ac9..304a062 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -395,7 +395,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                  *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
                  *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
                  */
-                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
                 "    SERVER FILTER BY PageFilter 4\n" +
                 "    SERVER 4 ROW LIMIT\n" +
                 "CLIENT 4 ROW LIMIT\n" +
@@ -745,7 +745,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                  *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
                  *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
                  */
-                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
                 "    SERVER FILTER BY PageFilter 4\n" +
                 "    SERVER 4 ROW LIMIT\n" +
                 "CLIENT 4 ROW LIMIT\n" +
@@ -1116,7 +1116,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                  *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
                  *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
                  */
-                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
                 "    SERVER FILTER BY PageFilter 4\n" +
                 "    SERVER 4 ROW LIMIT\n" +
                 "CLIENT 4 ROW LIMIT\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
index b7e3314..42ee5ad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
@@ -37,7 +37,9 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -51,7 +53,8 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
     public static void doSetup() throws Exception {
         Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
         // Must update config before starting server
-        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(50));
+        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(100));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     
@@ -79,7 +82,7 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
         assertEquals(4, rs.getInt(2));
         assertFalse(rs.next());
         List<KeyRange> splits = getAllSplits(conn5, "KEYONLY");
-        assertEquals(3, splits.size());
+        assertEquals(2, splits.size());
         conn5.close();
         
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+60));
@@ -159,6 +162,31 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
         conn5.close();
     }
         
+    @Test
+    public void testQueryWithLimitAndStats() throws Exception {
+        long ts = nextTimestamp();
+        ensureTableCreated(getUrl(),KEYONLY_NAME,null, ts);
+        initTableValues(ts+1, 100);
+        
+        TestUtil.analyzeTable(getUrl(), ts+10, KEYONLY_NAME);
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+50));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String query = "SELECT i1 FROM KEYONLY LIMIT 1";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        assertTrue(rs.next());
+        assertEquals(0, rs.getInt(1));
+        assertFalse(rs.next());
+        
+        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+        assertEquals("CLIENT SERIAL 1-WAY FULL SCAN OVER KEYONLY\n" + 
+                "    SERVER FILTER BY PageFilter 1\n" + 
+                "    SERVER 1 ROW LIMIT\n" + 
+                "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+        conn.close();
+    }
+    
     protected static void initTableValues(long ts) throws Exception {
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -177,4 +205,21 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
         conn.commit();
         conn.close();
     }
+
+    protected static void initTableValues(long ts, int nRows) throws Exception {
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(url, props);
+        PreparedStatement stmt = conn.prepareStatement(
+            "upsert into " +
+            "KEYONLY VALUES (?, ?)");
+        for (int i = 0; i < nRows; i++) {
+	        stmt.setInt(1, i);
+	        stmt.setInt(2, i+1);
+	        stmt.execute();
+        }
+        
+        conn.commit();
+        conn.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index faa54ea..cc16911 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.util.TestUtil.getAllSplits;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.sql.Array;
@@ -169,7 +170,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "b");
         s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -180,7 +180,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "c");
         s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -191,7 +190,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "d");
         s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -202,7 +200,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "b");
         s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -213,7 +210,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "e");
         s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -224,14 +220,9 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         return conn;
     }
 
-    private void flush(String tableName) throws IOException, InterruptedException {
-        //utility.getHBaseAdmin().flush(tableName.toUpperCase());
-    }
-
     private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException {
         PreparedStatement stmt;
         stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
@@ -298,9 +289,12 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
                 nRegions = services.getAllTableRegions(STATS_TEST_TABLE_BYTES).size();
                 nTries++;
             } while (nRegions == nRegionsNow && nTries < 10);
+            if (nRegions == nRegionsNow) {
+                fail();
+            }
             // FIXME: I see the commit of the stats finishing before this with a lower timestamp that the scan timestamp,
             // yet without this sleep, the query finds the old data. Seems like an HBase bug and a potentially serious one.
-            Thread.sleep(3000);
+            Thread.sleep(8000);
         } finally {
             admin.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 6388b1a..ba601ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -28,7 +28,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 4ca0bb5..108492a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -42,7 +42,7 @@ import org.apache.phoenix.execute.TupleProjectionPlan;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 7604663..335f4b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -606,7 +606,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         if (tenantId == null) {
             HTableInterface statsHTable = null;
             try {
-                statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
+                statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
                 stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp);
                 timeStamp = Math.max(timeStamp, stats.getTimestamp()); 
             } catch (org.apache.hadoop.hbase.TableNotFoundException e) {
@@ -839,7 +839,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         // TableName systemCatalogTableName = region.getTableDesc().getTableName();
         // HTableInterface hTable = env.getTable(systemCatalogTableName);
         // These deprecated calls work around the issue
-        HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+        HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
         try {
             boolean allViewsInCurrentRegion = true;
             int numOfChildViews = 0;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index d7a90fb..2ebfa41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -42,8 +42,8 @@ import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
 import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelIterators;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index d91ad51..33143bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -44,7 +44,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.DelegateResultIterator;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.FilterableStatement;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 12dc9ff..578855d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -33,12 +34,15 @@ import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
 import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelIterators;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.ResultIterators;
 import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.SerialIterators;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
 import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -47,6 +51,9 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.util.SchemaUtil;
 
 
 
@@ -105,7 +112,8 @@ public class ScanPlan extends BaseQueryPlan {
     @Override
     protected ResultIterator newIterator() throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be too late afterwards
-        context.getScan().setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
+    	Scan scan = context.getScan();
+        scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
         ResultIterator scanner;
         TableRef tableRef = this.getTableRef();
         PTable table = tableRef.getTable();
@@ -114,7 +122,40 @@ public class ScanPlan extends BaseQueryPlan {
          * limit is provided, run query serially.
          */
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
-        ParallelIterators iterators = new ParallelIterators(this, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
+        boolean isSerial = false;
+        Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
+        /*
+         * If a limit is provided and we have no filter, run the scan serially when we estimate that
+         * the limit's worth of data will fit into a single region.
+         */
+        if (perScanLimit != null && scan.getFilter() == null) {
+        	GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
+            long estRowSize = SchemaUtil.estimateRowSize(table);
+        	long estRegionSize;
+        	if (gpsInfo == null) {
+        	    // Use guidepost depth as minimum size
+        	    ConnectionQueryServices services = context.getConnection().getQueryServices();
+        	    HTableDescriptor desc = services.getTableDescriptor(table.getName().getBytes());
+                int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+                        QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
+                long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                        QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
+        	    estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
+        	} else {
+        		// Region size estimated based on total number of bytes divided by number of regions
+        	    estRegionSize = gpsInfo.getByteCount() / (gpsInfo.getGuidePosts().size()+1);
+        	}
+            // TODO: configurable number of bytes?
+            if (perScanLimit * estRowSize < estRegionSize) {
+                isSerial = true;
+            }
+        }
+        ResultIterators iterators;
+        if (isSerial) {
+        	iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory);
+        } else {
+        	iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory);
+        }
         splits = iterators.getSplits();
         scans = iterators.getScans();
         if (isOrdered) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
new file mode 100644
index 0000000..519c162
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+
+/**
+ *
+ * Class that parallelizes the scan over a table using the ExecutorService provided.  Each region of the table will be scanned in parallel with
+ * the results accessible through {@link #getIterators()}
+ *
+ * 
+ * @since 0.1
+ */
+public abstract class BaseResultIterators extends ExplainTable implements ResultIterators {
+	private static final Logger logger = LoggerFactory.getLogger(BaseResultIterators.class);
+    private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
+    private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
+
+    private final List<List<Scan>> scans;
+    private final List<KeyRange> splits;
+    private final PTableStats tableStats;
+    private final byte[] physicalTableName;
+    private final QueryPlan plan;
+    
+    static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
+        @Override
+        public KeyRange apply(HRegionLocation region) {
+            return KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
+        }
+    };
+
+    private PTable getTable() {
+        return plan.getTableRef().getTable();
+    }
+    
+    private boolean useStats() {
+        Scan scan = context.getScan();
+        boolean isPointLookup = context.getScanRanges().isPointLookup();
+        /*
+         *  Don't use guide posts if:
+         *  1) We're doing a point lookup, as HBase is fast enough at those
+         *     to not need them to be further parallelized. TODO: perf test to verify
+         *  2) We're collecting stats, as in this case we need to scan entire
+         *     regions worth of data to track where to put the guide posts.
+         */
+        if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) {
+            return false;
+        }
+        return true;
+    }
+    
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit)
+            throws SQLException {
+        super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint());
+        this.plan = plan;
+        StatementContext context = plan.getContext();
+        TableRef tableRef = plan.getTableRef();
+        PTable table = tableRef.getTable();
+        FilterableStatement statement = plan.getStatement();
+        RowProjector projector = plan.getProjector();
+        physicalTableName = table.getPhysicalName().getBytes();
+        tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
+        Scan scan = context.getScan();
+        if (projector.isProjectEmptyKeyValue()) {
+            Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
+            // If nothing projected into scan and we only have one column family, just allow everything
+            // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
+            // be quite a bit faster.
+            // Where condition columns also will get added into familyMap
+            // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
+            if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
+                    && table.getColumnFamilies().size() == 1) {
+                // Project the one column family. We must project a column family since it's possible
+                // that there are other non declared column families that we need to ignore.
+                scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
+                ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
+            } else {
+                byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
+                // Project empty key value unless the column family containing it has
+                // been projected in its entirety.
+                if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
+                    scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+                }
+            }
+        } else if (table.getViewType() == ViewType.MAPPED) {
+            // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
+            // selected column values are returned back to client
+            for (PColumnFamily family : table.getColumnFamilies()) {
+                scan.addFamily(family.getName().getBytes());
+            }
+        }
+        
+        // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
+        if (perScanLimit != null) {
+            ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
+        }
+
+        doColumnProjectionOptimization(context, scan, table, statement);
+        
+        this.scans = getParallelScans();
+        List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
+        for (List<Scan> scanList : scans) {
+            for (Scan aScan : scanList) {
+                splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
+            }
+        }
+        this.splits = ImmutableList.copyOf(splitRanges);
+    }
+
+    private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+        if (familyMap != null && !familyMap.isEmpty()) {
+            // columnsTracker contain cf -> qualifiers which should get returned.
+            Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker = 
+                    new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
+            Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+            int referencedCfCount = familyMap.size();
+            for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
+                if (!(familyMap.containsKey(whereCol.getFirst()))) {
+                    referencedCfCount++;
+                }
+            }
+            boolean useOptimization;
+            if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
+                // Do not use the optimization
+                useOptimization = false;
+            } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
+                // Strictly use the optimization
+                useOptimization = true;
+            } else {
+                // when referencedCfCount is >1 and no Hints, we are not using the optimization
+                useOptimization = referencedCfCount == 1;
+            }
+            if (useOptimization) {
+                for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+                    ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
+                    NavigableSet<byte[]> qs = entry.getValue();
+                    NavigableSet<ImmutableBytesPtr> cols = null;
+                    if (qs != null) {
+                        cols = new TreeSet<ImmutableBytesPtr>();
+                        for (byte[] q : qs) {
+                            cols.add(new ImmutableBytesPtr(q));
+                        }
+                    }
+                    columnsTracker.put(cf, cols);
+                }
+            }
+            // Making sure that where condition CFs are getting scanned at HRS.
+            for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
+                if (useOptimization) {
+                    if (!(familyMap.containsKey(whereCol.getFirst()))) {
+                        scan.addFamily(whereCol.getFirst());
+                        conditionOnlyCfs.add(whereCol.getFirst());
+                    }
+                } else {
+                    if (familyMap.containsKey(whereCol.getFirst())) {
+                        // where column's CF is present. If there are some specific columns added against this CF, we
+                        // need to ensure this where column also getting added in it.
+                        // If the select was like select cf1.*, then that itself will select the whole CF. So no need to
+                        // specifically add the where column. Adding that will remove the cf1.* stuff and only this
+                        // where condition column will get returned!
+                        NavigableSet<byte[]> cols = familyMap.get(whereCol.getFirst());
+                        // cols is null means the whole CF will get scanned.
+                        if (cols != null) {
+                            scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
+                        }
+                    } else {
+                        // where column's CF itself is not present in family map. We need to add the column
+                        scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
+                    }
+                }
+            }
+            if (useOptimization && !columnsTracker.isEmpty()) {
+                for (ImmutableBytesPtr f : columnsTracker.keySet()) {
+                    // This addFamily will remove explicit cols in scan familyMap and make it as entire row.
+                    // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
+                    scan.addFamily(f.get());
+                }
+                // We don't need this filter for aggregates, as we're not returning back what's
+                // in the scan in this case. We still want the other optimization that causes
+                // the ExplicitColumnTracker not to be used, though.
+                if (!(statement.isAggregate())) {
+                    ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
+                            columnsTracker, conditionOnlyCfs));
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<KeyRange> getSplits() {
+        return splits;
+    }
+
+    @Override
+    public List<List<Scan>> getScans() {
+        return scans;
+    }
+
+    private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
+        int nBoundaries = regionLocations.size() - 1;
+        List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
+        for (int i = 0; i < nBoundaries; i++) {
+            HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
+            ranges.add(regionInfo.getEndKey());
+        }
+        return ranges;
+    }
+    
+    private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) {
+        int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR);
+        // If we found an exact match, return the index+1, as the inclusiveKey will be contained
+        // in the next region (since we're matching on the end boundary).
+        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
+        return guideIndex;
+    }
+    
+    private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) {
+        int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR);
+        // If we found an exact match, return the index we found as the exclusiveKey won't be
+        // contained in the next region as with getIndexContainingInclusive.
+        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
+        return guideIndex;
+    }
+    
+    private List<byte[]> getGuidePosts() {
+        /*
+         *  Don't use guide posts if:
+         *  1) We're doing a point lookup, as HBase is fast enough at those
+         *     to not need them to be further parallelized. TODO: pref test to verify
+         *  2) We're collecting stats, as in this case we need to scan entire
+         *     regions worth of data to track where to put the guide posts.
+         */
+        if (!useStats()) {
+            return Collections.emptyList();
+        }
+        
+        List<byte[]> gps = null;
+        PTable table = getTable();
+        Map<byte[],GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
+        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
+        if (table.getColumnFamilies().isEmpty()) {
+            // For sure we can get the defaultCF from the table
+            if (guidePostMap.get(defaultCF) != null) {
+                gps = guidePostMap.get(defaultCF).getGuidePosts();
+            }
+        } else {
+            Scan scan = context.getScan();
+            if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) {
+                // If default CF is not used in scan, use first CF referenced in scan
+                GuidePostsInfo guidePostsInfo = guidePostMap.get(scan.getFamilyMap().keySet().iterator().next());
+                if (guidePostsInfo != null) {
+                    gps = guidePostsInfo.getGuidePosts();
+                }
+            } else {
+                // Otherwise, favor use of default CF.
+                if (guidePostMap.get(defaultCF) != null) {
+                    gps = guidePostMap.get(defaultCF).getGuidePosts();
+                }
+            }
+        }
+        if (gps == null) {
+            return Collections.emptyList();
+        }
+        return gps;
+    }
+    
+    private static String toString(List<byte[]> gps) {
+        StringBuilder buf = new StringBuilder(gps.size() * 100);
+        buf.append("[");
+        for (int i = 0; i < gps.size(); i++) {
+            buf.append(Bytes.toStringBinary(gps.get(i)));
+            buf.append(",");
+            if (i > 0 && i < gps.size()-1 && (i % 10) == 0) {
+                buf.append("\n");
+            }
+        }
+        buf.setCharAt(buf.length()-1, ']');
+        return buf.toString();
+    }
+    
+    private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) {
+        PTable table = getTable();
+        boolean startNewScanList = false;
+        if (!plan.isRowKeyOrdered()) {
+            startNewScanList = true;
+        } else if (crossedRegionBoundary) {
+            if (table.getIndexType() == IndexType.LOCAL) {
+                startNewScanList = true;
+            } else if (table.getBucketNum() != null) {
+                startNewScanList = scans.isEmpty() ||
+                        ScanUtil.crossesPrefixBoundary(startKey,
+                                ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES), 
+                                SaltingUtil.NUM_SALTING_BYTES);
+            }
+        }
+        if (scan != null) {
+            scans.add(scan);
+        }
+        if (startNewScanList && !scans.isEmpty()) {
+            parallelScans.add(scans);
+            scans = Lists.newArrayListWithExpectedSize(1);
+        }
+        return scans;
+    }
+
+    private List<List<Scan>> getParallelScans() throws SQLException {
+        return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+    }
+
+    /**
+     * Compute the list of parallel scans to run for a given query. The inner scans
+     * may be concatenated together directly, while the other ones may need to be
+     * merge sorted, depending on the query.
+     * @return list of parallel scans to run for a given query.
+     * @throws SQLException
+     */
+    private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException {
+        Scan scan = context.getScan();
+        List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+                .getAllTableRegions(physicalTableName);
+        
+        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+        ScanRanges scanRanges = context.getScanRanges();
+        PTable table = getTable();
+        boolean isSalted = table.getBucketNum() != null;
+        boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
+        List<byte[]> gps = getGuidePosts();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Guideposts: " + toString(gps));
+        }
+        boolean traverseAllRegions = isSalted || isLocalIndex;
+        if (!traverseAllRegions) {
+            byte[] scanStartRow = scan.getStartRow();
+            if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
+                startKey = scanStartRow;
+            }
+            byte[] scanStopRow = scan.getStopRow();
+            if (stopKey.length == 0 || Bytes.compareTo(scanStopRow, stopKey) < 0) {
+                stopKey = scanStopRow;
+            }
+        }
+        
+        int regionIndex = 0;
+        int stopIndex = regionBoundaries.size();
+        if (startKey.length > 0) {
+            regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+        }
+        if (stopKey.length > 0) {
+            stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
+            if (isLocalIndex) {
+                stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
+            }
+        }
+        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+        
+        byte[] currentKey = startKey;
+        int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey);
+        int gpsSize = gps.size();
+        int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
+        int keyOffset = 0;
+        List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
+        // Merge bisect with guideposts for all but the last region
+        while (regionIndex <= stopIndex) {
+            byte[] currentGuidePost, endKey, endRegionKey = EMPTY_BYTE_ARRAY;
+            if (regionIndex == stopIndex) {
+                endKey = stopKey;
+            } else {
+                endKey = regionBoundaries.get(regionIndex);
+            }
+            if (isLocalIndex) {
+                HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo();
+                endRegionKey = regionInfo.getEndKey();
+                keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
+            }
+            while (guideIndex < gpsSize
+                    && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
+                Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset, false);
+                scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false);
+                currentKey = currentGuidePost;
+                guideIndex++;
+            }
+            Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset, true);
+            if (isLocalIndex) {
+                if (newScan != null) {
+                    newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
+                } else if (!scans.isEmpty()) {
+                    scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
+                }
+            }
+            scans = addNewScan(parallelScans, scans, newScan, endKey, true);
+            currentKey = endKey;
+            regionIndex++;
+        }
+        if (!scans.isEmpty()) { // Add any remaining scans
+            parallelScans.add(scans);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans,
+                    ScanUtil.getCustomAnnotations(scan)));
+        }
+        return parallelScans;
+    }
+
+    public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
+        if (!reverse) {
+            return list;
+        }
+        return Lists.reverse(list);
+    }
+    
+    /**
+     * Executes the scan in parallel across all regions, blocking until all scans are complete.
+     * @return the result iterators for the scan of each region
+     */
+    @Override
+    public List<PeekingResultIterator> getIterators() throws SQLException {
+        boolean success = false;
+        boolean isReverse = ScanUtil.isReversed(context.getScan());
+        boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL;
+        final ConnectionQueryServices services = context.getConnection().getQueryServices();
+        ReadOnlyProps props = services.getProps();
+        int numSplits = size();
+        List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
+        final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
+        // TODO: what purpose does this scanID serve?
+        final UUID scanId = UUID.randomUUID();
+        try {
+            submitWork(scanId, scans, futures, splits.size());
+            int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
+            boolean clearedCache = false;
+            for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
+                List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
+                for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
+                    try {
+                        PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                        concatIterators.add(iterator);
+                    } catch (ExecutionException e) {
+                        try { // Rethrow as SQLException
+                            throw ServerUtil.parseServerException(e);
+                        } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+                            List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
+                            if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
+                                services.clearTableRegionCache(physicalTableName);
+                                clearedCache = true;
+                            }
+                            // Resubmit just this portion of work again
+                            Scan oldScan = scanPair.getFirst();
+                            byte[] startKey = oldScan.getStartRow();
+                            byte[] endKey = oldScan.getStopRow();
+                            if (isLocalIndex) {
+                                endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY);
+                            }
+                            List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
+                            // Add any concatIterators that were successful so far
+                            // as we need these to be in order
+                            addIterator(iterators, concatIterators);
+                            concatIterators = Collections.emptyList();
+                            submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
+                            for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
+                                for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
+                                    // Immediate do a get (not catching exception again) and then add the iterators we
+                                    // get back immediately. They'll be sorted as expected, since they're replacing the
+                                    // original one.
+                                    PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                                    iterators.add(iterator);
+                                }
+                            }
+                        }
+                    }
+                }
+                addIterator(iterators, concatIterators);
+            }
+
+            success = true;
+            return iterators;
+        } catch (SQLException e) {
+            throw e;
+        } catch (Exception e) {
+            throw ServerUtil.parseServerException(e);
+        } finally {
+            if (!success) {
+                SQLCloseables.closeAllQuietly(iterators);
+                // Don't call cancel on already started work, as it causes the HConnection
+                // to get into a funk. Instead, just cancel queued work.
+                for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
+                    for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
+                        futurePair.getSecond().cancel(false);
+                    }
+                }
+            }
+        }
+    }
+    
+    private void addIterator(List<PeekingResultIterator> parentIterators, List<PeekingResultIterator> childIterators) {
+        if (!childIterators.isEmpty()) {
+            parentIterators.add(ConcatResultIterator.newIterator(childIterators));
+        }
+    }
+
+    protected static final class ScanLocator {
+    	private final int outerListIndex;
+    	private final int innerListIndex;
+    	private final Scan scan;
+    	
+    	public ScanLocator(Scan scan, int outerListIndex, int innerListIndex) {
+    		this.outerListIndex = outerListIndex;
+    		this.innerListIndex = innerListIndex;
+    		this.scan = scan;
+    	}
+    	public int getOuterListIndex() {
+    		return outerListIndex;
+    	}
+    	public int getInnerListIndex() {
+    		return innerListIndex;
+    	}
+    	public Scan getScan() {
+    		return scan;
+    	}
+    }
+    
+
+    abstract protected String getName();    
+    abstract protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+            List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize);
+
+    @Override
+    public int size() {
+        return this.scans.size();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        boolean displayChunkCount = context.getConnection().getQueryServices().getProps().getBoolean(
+                QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB,
+                QueryServicesOptions.DEFAULT_EXPLAIN_CHUNK_COUNT);
+        StringBuilder buf = new StringBuilder();
+        buf.append("CLIENT " + (displayChunkCount ? (this.splits.size() + "-CHUNK ") : "") + getName() + " " + size() + "-WAY ");
+        explain(buf.toString(),planSteps);
+    }
+
+	@Override
+	public String toString() {
+		return "ResultIterators [name=" + getName() + ",scans=" + scans + "]";
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 4a62259..fecb0d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -44,7 +44,7 @@ import com.google.common.base.Preconditions;
 public class ChunkedResultIterator implements PeekingResultIterator {
     private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class);
 
-    private final ParallelIterators.ParallelIteratorFactory delegateIteratorFactory;
+    private final ParallelIteratorFactory delegateIteratorFactory;
     private ImmutableBytesWritable lastKey = new ImmutableBytesWritable();
     private final StatementContext context;
     private final TableRef tableRef;
@@ -52,12 +52,12 @@ public class ChunkedResultIterator implements PeekingResultIterator {
     private final long chunkSize;
     private PeekingResultIterator resultIterator;
 
-    public static class ChunkedResultIteratorFactory implements ParallelIterators.ParallelIteratorFactory {
+    public static class ChunkedResultIteratorFactory implements ParallelIteratorFactory {
 
-        private final ParallelIterators.ParallelIteratorFactory delegateFactory;
+        private final ParallelIteratorFactory delegateFactory;
         private final TableRef tableRef;
 
-        public ChunkedResultIteratorFactory(ParallelIterators.ParallelIteratorFactory
+        public ChunkedResultIteratorFactory(ParallelIteratorFactory
                 delegateFactory, TableRef tableRef) {
             this.delegateFactory = delegateFactory;
             this.tableRef = tableRef;
@@ -74,7 +74,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
         }
     }
 
-    public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory,
+    public ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory,
             StatementContext context, TableRef tableRef, Scan scan, long chunkSize) throws SQLException {
         this.delegateIteratorFactory = delegateIteratorFactory;
         this.context = context;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
index cddf3b3..03f8785 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -39,8 +39,13 @@ public class ConcatResultIterator implements PeekingResultIterator {
         this.resultIterators = iterators;
     }
     
+    private ConcatResultIterator(List<PeekingResultIterator> iterators) {
+        this.resultIterators = null;
+        this.iterators = iterators;
+    }
+    
     private List<PeekingResultIterator> getIterators() throws SQLException {
-        if (iterators == null) {
+        if (iterators == null && resultIterators != null) {
             iterators = resultIterators.getIterators();
         }
         return iterators;
@@ -59,7 +64,9 @@ public class ConcatResultIterator implements PeekingResultIterator {
 
     @Override
     public void explain(List<String> planSteps) {
-        resultIterators.explain(planSteps);
+        if (resultIterators != null) {
+            resultIterators.explain(planSteps);
+        }
     }
 
     private PeekingResultIterator currentIterator() throws SQLException {
@@ -88,11 +95,11 @@ public class ConcatResultIterator implements PeekingResultIterator {
 
 	@Override
 	public String toString() {
-		return "ConcatResultIterator [resultIterators=" + resultIterators
-				+ ", iterators=" + iterators + ", index=" + index + "]";
+		return "ConcatResultIterator [" + resultIterators == null ? ("iterators=" + iterators) : ("resultIterators=" + resultIterators) 
+				+ ", index=" + index + "]";
 	}
 
-    public static PeekingResultIterator newConcatResultIterator(final List<PeekingResultIterator> concatIterators) {
+    public static PeekingResultIterator newIterator(final List<PeekingResultIterator> concatIterators) {
         if (concatIterators.isEmpty()) {
             return PeekingResultIterator.EMPTY_ITERATOR;
         } 
@@ -100,24 +107,6 @@ public class ConcatResultIterator implements PeekingResultIterator {
         if (concatIterators.size() == 1) {
             return concatIterators.get(0);
         }
-        return new ConcatResultIterator(new ResultIterators() {
-
-            @Override
-            public List<PeekingResultIterator> getIterators() throws SQLException {
-                return concatIterators;
-            }
-
-            @Override
-            public int size() {
-                return concatIterators.size();
-            }
-
-            @Override
-            public void explain(List<String> planSteps) {
-                // TODO: review what we should for explain plan here
-                concatIterators.get(0).explain(planSteps);
-            }
-            
-        });
+        return new ConcatResultIterator(concatIterators);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
index 98b95c6..63b3142 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
@@ -30,6 +30,10 @@ public class DelegateResultIterator implements ResultIterator {
         this.delegate = delegate;
     }
     
+    protected ResultIterator getDelegate() {
+    	return delegate;
+    }
+    
     @Override
     public void close() throws SQLException {
         delegate.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java
new file mode 100644
index 0000000..a80693d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * 
+ * Iterates through tuples up to a limit
+ *
+ * 
+ * @since 1.2
+ */
+public class LimitingPeekingResultIterator extends LimitingResultIterator implements PeekingResultIterator {
+    
+    public LimitingPeekingResultIterator(PeekingResultIterator delegate, int limit) {
+        super(delegate, limit);
+    }
+
+    
+    @Override
+    protected PeekingResultIterator getDelegate() {
+    	return (PeekingResultIterator) super.getDelegate();
+    }
+    
+	@Override
+	public Tuple peek() throws SQLException {
+		return getDelegate().peek();
+	}
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e77cded0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
new file mode 100644
index 0000000..1ad3af0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.StatementContext;
+
+public interface ParallelIteratorFactory {
+    PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
+}
\ No newline at end of file