You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/12 19:02:38 UTC

[1/3] phoenix git commit: PHOENIX-1434 Stats are not maintained correctly through splits (Mujtaba)

Repository: phoenix
Updated Branches:
  refs/heads/4.0 4c2f37160 -> dbd3d6257


PHOENIX-1434 Stats are not maintained correctly through splits (Mujtaba)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/127a92d4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/127a92d4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/127a92d4

Branch: refs/heads/4.0
Commit: 127a92d4e2a460d5192dc3e6d65396d605a04278
Parents: 4c2f371
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Nov 12 09:06:11 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 12 09:06:11 2014 -0800

----------------------------------------------------------------------
 .../phoenix/schema/stats/StatisticsCollector.java       |  4 ++--
 .../apache/phoenix/schema/stats/StatisticsScanner.java  |  4 ++--
 .../apache/phoenix/schema/stats/StatisticsWriter.java   | 12 ++++++------
 3 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/127a92d4/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 5212ff9..90c8324 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -123,13 +123,13 @@ public class StatisticsCollector {
                     if(logger.isDebugEnabled()) {
                         logger.debug("Deleting the stats for the region "+region.getRegionInfo());
                     }
-                    statsTable.deleteStats(region.getRegionInfo().getRegionNameAsString(), this, Bytes.toString(fam.copyBytesIfNecessary()),
+                    statsTable.deleteStats(region.getRegionInfo().getRegionName(), this, Bytes.toString(fam.copyBytesIfNecessary()),
                             mutations);
                 }
                 if(logger.isDebugEnabled()) {
                     logger.debug("Adding new stats for the region "+region.getRegionInfo());
                 }
-                statsTable.addStats((region.getRegionInfo().getRegionNameAsString()), this, Bytes.toString(fam.copyBytesIfNecessary()),
+                statsTable.addStats((region.getRegionInfo().getRegionName()), this, Bytes.toString(fam.copyBytesIfNecessary()),
                         mutations);
             }
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/127a92d4/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 3a84cfc..fa3930d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -83,12 +83,12 @@ public class StatisticsScanner implements InternalScanner {
                 LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString()
                         + " as part of major compaction");
             }
-            stats.deleteStats(region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations);
+            stats.deleteStats(region.getRegionName(), this.tracker, Bytes.toString(family), mutations);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Adding new stats for the region " + region.getRegionNameAsString()
                         + " as part of major compaction");
             }
-            stats.addStats(region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations);
+            stats.addStats(region.getRegionName(), this.tracker, Bytes.toString(family), mutations);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Committing new stats for the region " + region.getRegionNameAsString()
                         + " as part of major compaction");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/127a92d4/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 9e2d659..6681042 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -130,13 +130,13 @@ public class StatisticsWriter implements Closeable {
 	                GuidePostsInfo lguidePosts = new GuidePostsInfo(byteSize, guidePosts.getGuidePosts().subList(0, midEndIndex));
 	                tracker.clear();
 	                tracker.addGuidePost(fam, lguidePosts, byteSize, cell.getTimestamp());
-	                addStats(l.getRegionNameAsString(), tracker, fam, mutations);
+	                addStats(l.getRegionName(), tracker, fam, mutations);
 	            }
 	            if (midStartIndex < guidePosts.getGuidePosts().size()) {
 	                GuidePostsInfo rguidePosts = new GuidePostsInfo(byteSize, guidePosts.getGuidePosts().subList(midStartIndex, guidePosts.getGuidePosts().size()));
 	                tracker.clear();
 	                tracker.addGuidePost(fam, rguidePosts, byteSize, cell.getTimestamp());
-	                addStats(r.getRegionNameAsString(), tracker, fam, mutations);
+	                addStats(r.getRegionName(), tracker, fam, mutations);
 	            }
         	}
         }
@@ -157,7 +157,7 @@ public class StatisticsWriter implements Closeable {
      *             if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to
      *             update
      */
-    public void addStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
+    public void addStats(byte[] regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
         if (tracker == null) { return; }
         boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP;
         long timeStamp = clientTimeStamp;
@@ -166,7 +166,7 @@ public class StatisticsWriter implements Closeable {
             mutations.add(getLastStatsUpdatedTimePut(timeStamp));
         }
         byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
-                PDataType.VARCHAR.toBytes(regionName));
+                regionName);
         Put put = new Put(prefix);
         GuidePostsInfo gp = tracker.getGuidePosts(fam);
         if (gp != null) {
@@ -238,11 +238,11 @@ public class StatisticsWriter implements Closeable {
         statisticsTable.put(put);
     }
     
-    public void deleteStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)
+    public void deleteStats(byte[] regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)
             throws IOException {
         long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp() : clientTimeStamp;
         byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
-                PDataType.VARCHAR.toBytes(regionName));
+                regionName);
         mutations.add(new Delete(prefix, timeStamp - 1));
     }
 }
\ No newline at end of file


[3/3] phoenix git commit: PHOENIX-1432 Run limit query that has only leading PK column filter serially

Posted by ja...@apache.org.
PHOENIX-1432 Run limit query that has only leading PK column filter serially


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dbd3d625
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dbd3d625
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dbd3d625

Branch: refs/heads/4.0
Commit: dbd3d6257302326320da2616ea965a60b91f19c7
Parents: 127a92d
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Nov 12 10:02:20 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 12 10:02:20 2014 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/HashJoinIT.java  |   6 +-
 .../org/apache/phoenix/end2end/KeyOnlyIT.java   |  49 +-
 .../phoenix/end2end/StatsCollectorIT.java       |  16 +-
 .../MutatingParallelIteratorFactory.java        |   2 +-
 .../apache/phoenix/compile/QueryCompiler.java   |   2 +-
 .../coprocessor/MetaDataEndpointImpl.java       |   4 +-
 .../apache/phoenix/execute/AggregatePlan.java   |   2 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |   2 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |  47 +-
 .../phoenix/iterate/BaseResultIterators.java    | 622 +++++++++++++++++++
 .../phoenix/iterate/ChunkedResultIterator.java  |  10 +-
 .../phoenix/iterate/ConcatResultIterator.java   |  37 +-
 .../phoenix/iterate/DelegateResultIterator.java |   4 +
 .../iterate/LimitingPeekingResultIterator.java  |  47 ++
 .../iterate/ParallelIteratorFactory.java        |  27 +
 .../phoenix/iterate/ParallelIterators.java      | 580 +----------------
 .../apache/phoenix/iterate/ResultIterators.java |   7 +-
 .../apache/phoenix/iterate/SerialIterators.java | 115 ++++
 .../phoenix/iterate/SpoolingResultIterator.java |   1 -
 .../apache/phoenix/optimize/QueryOptimizer.java |   2 +-
 .../phoenix/query/QueryServicesOptions.java     |   1 +
 .../schema/stats/StatisticsCollector.java       |  23 +-
 .../phoenix/schema/stats/StatisticsUtil.java    |  17 +
 .../phoenix/schema/stats/StatisticsWriter.java  |  30 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |  24 +
 .../org/apache/phoenix/util/ServerUtil.java     |  34 +-
 .../iterate/AggregateResultScannerTest.java     |  11 +
 .../iterate/ConcatResultIteratorTest.java       |  21 +-
 .../iterate/MergeSortResultIteratorTest.java    |  12 +-
 .../org/apache/phoenix/query/QueryPlanTest.java |  51 ++
 .../phoenix/pig/hadoop/PhoenixRecordReader.java |   2 +-
 31 files changed, 1145 insertions(+), 663 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 5190c18..ec22360 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -394,7 +394,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                  *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
                  *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
                  */
-                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
                 "    SERVER FILTER BY PageFilter 4\n" +
                 "    SERVER 4 ROW LIMIT\n" +
                 "CLIENT 4 ROW LIMIT\n" +
@@ -744,7 +744,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                  *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
                  *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
                  */
-                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
                 "    SERVER FILTER BY PageFilter 4\n" +
                 "    SERVER 4 ROW LIMIT\n" +
                 "CLIENT 4 ROW LIMIT\n" +
@@ -1115,7 +1115,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                  *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
                  *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4
                  */
-                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
                 "    SERVER FILTER BY PageFilter 4\n" +
                 "    SERVER 4 ROW LIMIT\n" +
                 "CLIENT 4 ROW LIMIT\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
index b7e3314..42ee5ad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
@@ -37,7 +37,9 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -51,7 +53,8 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
     public static void doSetup() throws Exception {
         Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
         // Must update config before starting server
-        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(50));
+        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(100));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     
@@ -79,7 +82,7 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
         assertEquals(4, rs.getInt(2));
         assertFalse(rs.next());
         List<KeyRange> splits = getAllSplits(conn5, "KEYONLY");
-        assertEquals(3, splits.size());
+        assertEquals(2, splits.size());
         conn5.close();
         
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+60));
@@ -159,6 +162,31 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
         conn5.close();
     }
         
+    @Test
+    public void testQueryWithLimitAndStats() throws Exception {
+        long ts = nextTimestamp();
+        ensureTableCreated(getUrl(),KEYONLY_NAME,null, ts);
+        initTableValues(ts+1, 100);
+        
+        TestUtil.analyzeTable(getUrl(), ts+10, KEYONLY_NAME);
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+50));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String query = "SELECT i1 FROM KEYONLY LIMIT 1";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        assertTrue(rs.next());
+        assertEquals(0, rs.getInt(1));
+        assertFalse(rs.next());
+        
+        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+        assertEquals("CLIENT SERIAL 1-WAY FULL SCAN OVER KEYONLY\n" + 
+                "    SERVER FILTER BY PageFilter 1\n" + 
+                "    SERVER 1 ROW LIMIT\n" + 
+                "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+        conn.close();
+    }
+    
     protected static void initTableValues(long ts) throws Exception {
         String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -177,4 +205,21 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT {
         conn.commit();
         conn.close();
     }
+
+    protected static void initTableValues(long ts, int nRows) throws Exception {
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(url, props);
+        PreparedStatement stmt = conn.prepareStatement(
+            "upsert into " +
+            "KEYONLY VALUES (?, ?)");
+        for (int i = 0; i < nRows; i++) {
+	        stmt.setInt(1, i);
+	        stmt.setInt(2, i+1);
+	        stmt.execute();
+        }
+        
+        conn.commit();
+        conn.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index faa54ea..cc16911 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.util.TestUtil.getAllSplits;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.sql.Array;
@@ -169,7 +170,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "b");
         s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -180,7 +180,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "c");
         s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -191,7 +190,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "d");
         s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -202,7 +200,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "b");
         s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -213,7 +210,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "e");
         s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -224,14 +220,9 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         stmt.setArray(3, array);
         stmt.execute();
         conn.commit();
-        flush(tableName);
         return conn;
     }
 
-    private void flush(String tableName) throws IOException, InterruptedException {
-        //utility.getHBaseAdmin().flush(tableName.toUpperCase());
-    }
-
     private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException {
         PreparedStatement stmt;
         stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
@@ -298,9 +289,12 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
                 nRegions = services.getAllTableRegions(STATS_TEST_TABLE_BYTES).size();
                 nTries++;
             } while (nRegions == nRegionsNow && nTries < 10);
+            if (nRegions == nRegionsNow) {
+                fail();
+            }
             // FIXME: I see the commit of the stats finishing before this with a lower timestamp that the scan timestamp,
             // yet without this sleep, the query finds the old data. Seems like an HBase bug and a potentially serious one.
-            Thread.sleep(3000);
+            Thread.sleep(8000);
         } finally {
             admin.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 6388b1a..ba601ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -28,7 +28,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 43a13ac..e76c05c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -46,7 +46,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 7604663..335f4b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -606,7 +606,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         if (tenantId == null) {
             HTableInterface statsHTable = null;
             try {
-                statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
+                statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
                 stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp);
                 timeStamp = Math.max(timeStamp, stats.getTimestamp()); 
             } catch (org.apache.hadoop.hbase.TableNotFoundException e) {
@@ -839,7 +839,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         // TableName systemCatalogTableName = region.getTableDesc().getTableName();
         // HTableInterface hTable = env.getTable(systemCatalogTableName);
         // These deprecated calls work around the issue
-        HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+        HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
         try {
             boolean allViewsInCurrentRegion = true;
             int numOfChildViews = 0;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index d7a90fb..2ebfa41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -42,8 +42,8 @@ import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
 import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelIterators;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index d91ad51..33143bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -44,7 +44,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.DelegateResultIterator;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.parse.FilterableStatement;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 12dc9ff..578855d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -33,12 +34,15 @@ import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
 import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
 import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelIterators;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.ResultIterators;
 import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.SerialIterators;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
 import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -47,6 +51,9 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.util.SchemaUtil;
 
 
 
@@ -105,7 +112,8 @@ public class ScanPlan extends BaseQueryPlan {
     @Override
     protected ResultIterator newIterator() throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be too late afterwards
-        context.getScan().setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
+    	Scan scan = context.getScan();
+        scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
         ResultIterator scanner;
         TableRef tableRef = this.getTableRef();
         PTable table = tableRef.getTable();
@@ -114,7 +122,40 @@ public class ScanPlan extends BaseQueryPlan {
          * limit is provided, run query serially.
          */
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
-        ParallelIterators iterators = new ParallelIterators(this, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
+        boolean isSerial = false;
+        Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
+        /*
+         * If a limit is provided and we have no filter, run the scan serially when we estimate that
+         * the limit's worth of data will fit into a single region.
+         */
+        if (perScanLimit != null && scan.getFilter() == null) {
+        	GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
+            long estRowSize = SchemaUtil.estimateRowSize(table);
+        	long estRegionSize;
+        	if (gpsInfo == null) {
+        	    // Use guidepost depth as minimum size
+        	    ConnectionQueryServices services = context.getConnection().getQueryServices();
+        	    HTableDescriptor desc = services.getTableDescriptor(table.getName().getBytes());
+                int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+                        QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
+                long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                        QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
+        	    estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
+        	} else {
+        		// Region size estimated based on total number of bytes divided by number of regions
+        	    estRegionSize = gpsInfo.getByteCount() / (gpsInfo.getGuidePosts().size()+1);
+        	}
+            // TODO: configurable number of bytes?
+            if (perScanLimit * estRowSize < estRegionSize) {
+                isSerial = true;
+            }
+        }
+        ResultIterators iterators;
+        if (isSerial) {
+        	iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory);
+        } else {
+        	iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory);
+        }
         splits = iterators.getSplits();
         scans = iterators.getScans();
         if (isOrdered) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
new file mode 100644
index 0000000..519c162
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
+import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+
+/**
+ *
+ * Class that parallelizes the scan over a table using the ExecutorService provided.  Each region of the table will be scanned in parallel with
+ * the results accessible through {@link #getIterators()}
+ *
+ * 
+ * @since 0.1
+ */
+public abstract class BaseResultIterators extends ExplainTable implements ResultIterators {
+	private static final Logger logger = LoggerFactory.getLogger(BaseResultIterators.class);
+    private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
+    private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
+
+    private final List<List<Scan>> scans;
+    private final List<KeyRange> splits;
+    private final PTableStats tableStats;
+    private final byte[] physicalTableName;
+    private final QueryPlan plan;
+    
+    static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
+        @Override
+        public KeyRange apply(HRegionLocation region) {
+            return KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
+        }
+    };
+
+    private PTable getTable() {
+        return plan.getTableRef().getTable();
+    }
+    
+    private boolean useStats() {
+        Scan scan = context.getScan();
+        boolean isPointLookup = context.getScanRanges().isPointLookup();
+        /*
+         *  Don't use guide posts if:
+         *  1) We're doing a point lookup, as HBase is fast enough at those
+         *     to not need them to be further parallelized. TODO: perf test to verify
+         *  2) We're collecting stats, as in this case we need to scan entire
+         *     regions worth of data to track where to put the guide posts.
+         */
+        if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) {
+            return false;
+        }
+        return true;
+    }
+    
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit)
+            throws SQLException {
+        super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint());
+        this.plan = plan;
+        StatementContext context = plan.getContext();
+        TableRef tableRef = plan.getTableRef();
+        PTable table = tableRef.getTable();
+        FilterableStatement statement = plan.getStatement();
+        RowProjector projector = plan.getProjector();
+        physicalTableName = table.getPhysicalName().getBytes();
+        tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
+        Scan scan = context.getScan();
+        if (projector.isProjectEmptyKeyValue()) {
+            Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
+            // If nothing projected into scan and we only have one column family, just allow everything
+            // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
+            // be quite a bit faster.
+            // Where condition columns also will get added into familyMap
+            // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
+            if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
+                    && table.getColumnFamilies().size() == 1) {
+                // Project the one column family. We must project a column family since it's possible
+                // that there are other non declared column families that we need to ignore.
+                scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
+                ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
+            } else {
+                byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
+                // Project empty key value unless the column family containing it has
+                // been projected in its entirety.
+                if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
+                    scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+                }
+            }
+        } else if (table.getViewType() == ViewType.MAPPED) {
+            // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
+            // selected column values are returned back to client
+            for (PColumnFamily family : table.getColumnFamilies()) {
+                scan.addFamily(family.getName().getBytes());
+            }
+        }
+        
+        // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
+        if (perScanLimit != null) {
+            ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
+        }
+
+        doColumnProjectionOptimization(context, scan, table, statement);
+        
+        this.scans = getParallelScans();
+        List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
+        for (List<Scan> scanList : scans) {
+            for (Scan aScan : scanList) {
+                splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
+            }
+        }
+        this.splits = ImmutableList.copyOf(splitRanges);
+    }
+
+    private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+        if (familyMap != null && !familyMap.isEmpty()) {
+            // columnsTracker contain cf -> qualifiers which should get returned.
+            Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker = 
+                    new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
+            Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+            int referencedCfCount = familyMap.size();
+            for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
+                if (!(familyMap.containsKey(whereCol.getFirst()))) {
+                    referencedCfCount++;
+                }
+            }
+            boolean useOptimization;
+            if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
+                // Do not use the optimization
+                useOptimization = false;
+            } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
+                // Strictly use the optimization
+                useOptimization = true;
+            } else {
+                // when referencedCfCount is >1 and no Hints, we are not using the optimization
+                useOptimization = referencedCfCount == 1;
+            }
+            if (useOptimization) {
+                for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+                    ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
+                    NavigableSet<byte[]> qs = entry.getValue();
+                    NavigableSet<ImmutableBytesPtr> cols = null;
+                    if (qs != null) {
+                        cols = new TreeSet<ImmutableBytesPtr>();
+                        for (byte[] q : qs) {
+                            cols.add(new ImmutableBytesPtr(q));
+                        }
+                    }
+                    columnsTracker.put(cf, cols);
+                }
+            }
+            // Making sure that where condition CFs are getting scanned at HRS.
+            for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
+                if (useOptimization) {
+                    if (!(familyMap.containsKey(whereCol.getFirst()))) {
+                        scan.addFamily(whereCol.getFirst());
+                        conditionOnlyCfs.add(whereCol.getFirst());
+                    }
+                } else {
+                    if (familyMap.containsKey(whereCol.getFirst())) {
+                        // where column's CF is present. If there are some specific columns added against this CF, we
+                        // need to ensure this where column also getting added in it.
+                        // If the select was like select cf1.*, then that itself will select the whole CF. So no need to
+                        // specifically add the where column. Adding that will remove the cf1.* stuff and only this
+                        // where condition column will get returned!
+                        NavigableSet<byte[]> cols = familyMap.get(whereCol.getFirst());
+                        // cols is null means the whole CF will get scanned.
+                        if (cols != null) {
+                            scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
+                        }
+                    } else {
+                        // where column's CF itself is not present in family map. We need to add the column
+                        scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
+                    }
+                }
+            }
+            if (useOptimization && !columnsTracker.isEmpty()) {
+                for (ImmutableBytesPtr f : columnsTracker.keySet()) {
+                    // This addFamily will remove explicit cols in scan familyMap and make it as entire row.
+                    // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
+                    scan.addFamily(f.get());
+                }
+                // We don't need this filter for aggregates, as we're not returning back what's
+                // in the scan in this case. We still want the other optimization that causes
+                // the ExplicitColumnTracker not to be used, though.
+                if (!(statement.isAggregate())) {
+                    ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
+                            columnsTracker, conditionOnlyCfs));
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<KeyRange> getSplits() {
+        return splits;
+    }
+
+    @Override
+    public List<List<Scan>> getScans() {
+        return scans;
+    }
+
+    private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
+        int nBoundaries = regionLocations.size() - 1;
+        List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
+        for (int i = 0; i < nBoundaries; i++) {
+            HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
+            ranges.add(regionInfo.getEndKey());
+        }
+        return ranges;
+    }
+    
+    private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) {
+        int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR);
+        // If we found an exact match, return the index+1, as the inclusiveKey will be contained
+        // in the next region (since we're matching on the end boundary).
+        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
+        return guideIndex;
+    }
+    
+    private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) {
+        int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR);
+        // If we found an exact match, return the index we found as the exclusiveKey won't be
+        // contained in the next region as with getIndexContainingInclusive.
+        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
+        return guideIndex;
+    }
+    
+    private List<byte[]> getGuidePosts() {
+        /*
+         *  Don't use guide posts if:
+         *  1) We're doing a point lookup, as HBase is fast enough at those
+         *     to not need them to be further parallelized. TODO: pref test to verify
+         *  2) We're collecting stats, as in this case we need to scan entire
+         *     regions worth of data to track where to put the guide posts.
+         */
+        if (!useStats()) {
+            return Collections.emptyList();
+        }
+        
+        List<byte[]> gps = null;
+        PTable table = getTable();
+        Map<byte[],GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
+        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
+        if (table.getColumnFamilies().isEmpty()) {
+            // For sure we can get the defaultCF from the table
+            if (guidePostMap.get(defaultCF) != null) {
+                gps = guidePostMap.get(defaultCF).getGuidePosts();
+            }
+        } else {
+            Scan scan = context.getScan();
+            if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) {
+                // If default CF is not used in scan, use first CF referenced in scan
+                GuidePostsInfo guidePostsInfo = guidePostMap.get(scan.getFamilyMap().keySet().iterator().next());
+                if (guidePostsInfo != null) {
+                    gps = guidePostsInfo.getGuidePosts();
+                }
+            } else {
+                // Otherwise, favor use of default CF.
+                if (guidePostMap.get(defaultCF) != null) {
+                    gps = guidePostMap.get(defaultCF).getGuidePosts();
+                }
+            }
+        }
+        if (gps == null) {
+            return Collections.emptyList();
+        }
+        return gps;
+    }
+    
+    private static String toString(List<byte[]> gps) {
+        StringBuilder buf = new StringBuilder(gps.size() * 100);
+        buf.append("[");
+        for (int i = 0; i < gps.size(); i++) {
+            buf.append(Bytes.toStringBinary(gps.get(i)));
+            buf.append(",");
+            if (i > 0 && i < gps.size()-1 && (i % 10) == 0) {
+                buf.append("\n");
+            }
+        }
+        buf.setCharAt(buf.length()-1, ']');
+        return buf.toString();
+    }
+    
+    private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) {
+        PTable table = getTable();
+        boolean startNewScanList = false;
+        if (!plan.isRowKeyOrdered()) {
+            startNewScanList = true;
+        } else if (crossedRegionBoundary) {
+            if (table.getIndexType() == IndexType.LOCAL) {
+                startNewScanList = true;
+            } else if (table.getBucketNum() != null) {
+                startNewScanList = scans.isEmpty() ||
+                        ScanUtil.crossesPrefixBoundary(startKey,
+                                ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES), 
+                                SaltingUtil.NUM_SALTING_BYTES);
+            }
+        }
+        if (scan != null) {
+            scans.add(scan);
+        }
+        if (startNewScanList && !scans.isEmpty()) {
+            parallelScans.add(scans);
+            scans = Lists.newArrayListWithExpectedSize(1);
+        }
+        return scans;
+    }
+
+    private List<List<Scan>> getParallelScans() throws SQLException {
+        return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+    }
+
+    /**
+     * Compute the list of parallel scans to run for a given query. The inner scans
+     * may be concatenated together directly, while the other ones may need to be
+     * merge sorted, depending on the query.
+     * @return list of parallel scans to run for a given query.
+     * @throws SQLException
+     */
+    private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException {
+        Scan scan = context.getScan();
+        List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+                .getAllTableRegions(physicalTableName);
+        
+        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+        ScanRanges scanRanges = context.getScanRanges();
+        PTable table = getTable();
+        boolean isSalted = table.getBucketNum() != null;
+        boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
+        List<byte[]> gps = getGuidePosts();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Guideposts: " + toString(gps));
+        }
+        boolean traverseAllRegions = isSalted || isLocalIndex;
+        if (!traverseAllRegions) {
+            byte[] scanStartRow = scan.getStartRow();
+            if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
+                startKey = scanStartRow;
+            }
+            byte[] scanStopRow = scan.getStopRow();
+            if (stopKey.length == 0 || Bytes.compareTo(scanStopRow, stopKey) < 0) {
+                stopKey = scanStopRow;
+            }
+        }
+        
+        int regionIndex = 0;
+        int stopIndex = regionBoundaries.size();
+        if (startKey.length > 0) {
+            regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+        }
+        if (stopKey.length > 0) {
+            stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
+            if (isLocalIndex) {
+                stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
+            }
+        }
+        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+        
+        byte[] currentKey = startKey;
+        int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey);
+        int gpsSize = gps.size();
+        int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
+        int keyOffset = 0;
+        List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
+        // Merge bisect with guideposts for all but the last region
+        while (regionIndex <= stopIndex) {
+            byte[] currentGuidePost, endKey, endRegionKey = EMPTY_BYTE_ARRAY;
+            if (regionIndex == stopIndex) {
+                endKey = stopKey;
+            } else {
+                endKey = regionBoundaries.get(regionIndex);
+            }
+            if (isLocalIndex) {
+                HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo();
+                endRegionKey = regionInfo.getEndKey();
+                keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
+            }
+            while (guideIndex < gpsSize
+                    && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
+                Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset, false);
+                scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false);
+                currentKey = currentGuidePost;
+                guideIndex++;
+            }
+            Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset, true);
+            if (isLocalIndex) {
+                if (newScan != null) {
+                    newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
+                } else if (!scans.isEmpty()) {
+                    scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
+                }
+            }
+            scans = addNewScan(parallelScans, scans, newScan, endKey, true);
+            currentKey = endKey;
+            regionIndex++;
+        }
+        if (!scans.isEmpty()) { // Add any remaining scans
+            parallelScans.add(scans);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans,
+                    ScanUtil.getCustomAnnotations(scan)));
+        }
+        return parallelScans;
+    }
+
+    public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
+        if (!reverse) {
+            return list;
+        }
+        return Lists.reverse(list);
+    }
+    
+    /**
+     * Executes the scan in parallel across all regions, blocking until all scans are complete.
+     * @return the result iterators for the scan of each region
+     */
+    @Override
+    public List<PeekingResultIterator> getIterators() throws SQLException {
+        boolean success = false;
+        boolean isReverse = ScanUtil.isReversed(context.getScan());
+        boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL;
+        final ConnectionQueryServices services = context.getConnection().getQueryServices();
+        ReadOnlyProps props = services.getProps();
+        int numSplits = size();
+        List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
+        final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
+        // TODO: what purpose does this scanID serve?
+        final UUID scanId = UUID.randomUUID();
+        try {
+            submitWork(scanId, scans, futures, splits.size());
+            int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
+            boolean clearedCache = false;
+            for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
+                List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
+                for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
+                    try {
+                        PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                        concatIterators.add(iterator);
+                    } catch (ExecutionException e) {
+                        try { // Rethrow as SQLException
+                            throw ServerUtil.parseServerException(e);
+                        } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+                            List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
+                            if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
+                                services.clearTableRegionCache(physicalTableName);
+                                clearedCache = true;
+                            }
+                            // Resubmit just this portion of work again
+                            Scan oldScan = scanPair.getFirst();
+                            byte[] startKey = oldScan.getStartRow();
+                            byte[] endKey = oldScan.getStopRow();
+                            if (isLocalIndex) {
+                                endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY);
+                            }
+                            List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
+                            // Add any concatIterators that were successful so far
+                            // as we need these to be in order
+                            addIterator(iterators, concatIterators);
+                            concatIterators = Collections.emptyList();
+                            submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
+                            for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
+                                for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
+                                    // Immediate do a get (not catching exception again) and then add the iterators we
+                                    // get back immediately. They'll be sorted as expected, since they're replacing the
+                                    // original one.
+                                    PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                                    iterators.add(iterator);
+                                }
+                            }
+                        }
+                    }
+                }
+                addIterator(iterators, concatIterators);
+            }
+
+            success = true;
+            return iterators;
+        } catch (SQLException e) {
+            throw e;
+        } catch (Exception e) {
+            throw ServerUtil.parseServerException(e);
+        } finally {
+            if (!success) {
+                SQLCloseables.closeAllQuietly(iterators);
+                // Don't call cancel on already started work, as it causes the HConnection
+                // to get into a funk. Instead, just cancel queued work.
+                for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
+                    for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
+                        futurePair.getSecond().cancel(false);
+                    }
+                }
+            }
+        }
+    }
+    
+    private void addIterator(List<PeekingResultIterator> parentIterators, List<PeekingResultIterator> childIterators) {
+        if (!childIterators.isEmpty()) {
+            parentIterators.add(ConcatResultIterator.newIterator(childIterators));
+        }
+    }
+
+    protected static final class ScanLocator {
+    	private final int outerListIndex;
+    	private final int innerListIndex;
+    	private final Scan scan;
+    	
+    	public ScanLocator(Scan scan, int outerListIndex, int innerListIndex) {
+    		this.outerListIndex = outerListIndex;
+    		this.innerListIndex = innerListIndex;
+    		this.scan = scan;
+    	}
+    	public int getOuterListIndex() {
+    		return outerListIndex;
+    	}
+    	public int getInnerListIndex() {
+    		return innerListIndex;
+    	}
+    	public Scan getScan() {
+    		return scan;
+    	}
+    }
+    
+
+    abstract protected String getName();    
+    abstract protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+            List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize);
+
+    @Override
+    public int size() {
+        return this.scans.size();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        boolean displayChunkCount = context.getConnection().getQueryServices().getProps().getBoolean(
+                QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB,
+                QueryServicesOptions.DEFAULT_EXPLAIN_CHUNK_COUNT);
+        StringBuilder buf = new StringBuilder();
+        buf.append("CLIENT " + (displayChunkCount ? (this.splits.size() + "-CHUNK ") : "") + getName() + " " + size() + "-WAY ");
+        explain(buf.toString(),planSteps);
+    }
+
+	@Override
+	public String toString() {
+		return "ResultIterators [name=" + getName() + ",scans=" + scans + "]";
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 4a62259..fecb0d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -44,7 +44,7 @@ import com.google.common.base.Preconditions;
 public class ChunkedResultIterator implements PeekingResultIterator {
     private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class);
 
-    private final ParallelIterators.ParallelIteratorFactory delegateIteratorFactory;
+    private final ParallelIteratorFactory delegateIteratorFactory;
     private ImmutableBytesWritable lastKey = new ImmutableBytesWritable();
     private final StatementContext context;
     private final TableRef tableRef;
@@ -52,12 +52,12 @@ public class ChunkedResultIterator implements PeekingResultIterator {
     private final long chunkSize;
     private PeekingResultIterator resultIterator;
 
-    public static class ChunkedResultIteratorFactory implements ParallelIterators.ParallelIteratorFactory {
+    public static class ChunkedResultIteratorFactory implements ParallelIteratorFactory {
 
-        private final ParallelIterators.ParallelIteratorFactory delegateFactory;
+        private final ParallelIteratorFactory delegateFactory;
         private final TableRef tableRef;
 
-        public ChunkedResultIteratorFactory(ParallelIterators.ParallelIteratorFactory
+        public ChunkedResultIteratorFactory(ParallelIteratorFactory
                 delegateFactory, TableRef tableRef) {
             this.delegateFactory = delegateFactory;
             this.tableRef = tableRef;
@@ -74,7 +74,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
         }
     }
 
-    public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory,
+    public ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory,
             StatementContext context, TableRef tableRef, Scan scan, long chunkSize) throws SQLException {
         this.delegateIteratorFactory = delegateIteratorFactory;
         this.context = context;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
index cddf3b3..03f8785 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -39,8 +39,13 @@ public class ConcatResultIterator implements PeekingResultIterator {
         this.resultIterators = iterators;
     }
     
+    private ConcatResultIterator(List<PeekingResultIterator> iterators) {
+        this.resultIterators = null;
+        this.iterators = iterators;
+    }
+    
     private List<PeekingResultIterator> getIterators() throws SQLException {
-        if (iterators == null) {
+        if (iterators == null && resultIterators != null) {
             iterators = resultIterators.getIterators();
         }
         return iterators;
@@ -59,7 +64,9 @@ public class ConcatResultIterator implements PeekingResultIterator {
 
     @Override
     public void explain(List<String> planSteps) {
-        resultIterators.explain(planSteps);
+        if (resultIterators != null) {
+            resultIterators.explain(planSteps);
+        }
     }
 
     private PeekingResultIterator currentIterator() throws SQLException {
@@ -88,11 +95,11 @@ public class ConcatResultIterator implements PeekingResultIterator {
 
 	@Override
 	public String toString() {
-		return "ConcatResultIterator [resultIterators=" + resultIterators
-				+ ", iterators=" + iterators + ", index=" + index + "]";
+		return "ConcatResultIterator [" + resultIterators == null ? ("iterators=" + iterators) : ("resultIterators=" + resultIterators) 
+				+ ", index=" + index + "]";
 	}
 
-    public static PeekingResultIterator newConcatResultIterator(final List<PeekingResultIterator> concatIterators) {
+    public static PeekingResultIterator newIterator(final List<PeekingResultIterator> concatIterators) {
         if (concatIterators.isEmpty()) {
             return PeekingResultIterator.EMPTY_ITERATOR;
         } 
@@ -100,24 +107,6 @@ public class ConcatResultIterator implements PeekingResultIterator {
         if (concatIterators.size() == 1) {
             return concatIterators.get(0);
         }
-        return new ConcatResultIterator(new ResultIterators() {
-
-            @Override
-            public List<PeekingResultIterator> getIterators() throws SQLException {
-                return concatIterators;
-            }
-
-            @Override
-            public int size() {
-                return concatIterators.size();
-            }
-
-            @Override
-            public void explain(List<String> planSteps) {
-                // TODO: review what we should for explain plan here
-                concatIterators.get(0).explain(planSteps);
-            }
-            
-        });
+        return new ConcatResultIterator(concatIterators);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
index 98b95c6..63b3142 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java
@@ -30,6 +30,10 @@ public class DelegateResultIterator implements ResultIterator {
         this.delegate = delegate;
     }
     
+    protected ResultIterator getDelegate() {
+    	return delegate;
+    }
+    
     @Override
     public void close() throws SQLException {
         delegate.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java
new file mode 100644
index 0000000..a80693d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * 
+ * Iterates through tuples up to a limit
+ *
+ * 
+ * @since 1.2
+ */
+public class LimitingPeekingResultIterator extends LimitingResultIterator implements PeekingResultIterator {
+    
+    public LimitingPeekingResultIterator(PeekingResultIterator delegate, int limit) {
+        super(delegate, limit);
+    }
+
+    
+    @Override
+    protected PeekingResultIterator getDelegate() {
+    	return (PeekingResultIterator) super.getDelegate();
+    }
+    
+	@Override
+	public Tuple peek() throws SQLException {
+		return getDelegate().peek();
+	}
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
new file mode 100644
index 0000000..1ad3af0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.StatementContext;
+
+public interface ParallelIteratorFactory {
+    PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
+}
\ No newline at end of file


[2/3] phoenix git commit: PHOENIX-1432 Run limit query that has only leading PK column filter serially

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 7905d34..bde3f78 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -17,68 +17,23 @@
  */
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
-import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
-
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.filter.ColumnProjectionFilter;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.job.JobManager.JobCallable;
-import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.HintNode.Hint;
-import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.PColumnFamily;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.schema.SaltingUtil;
-import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.stats.GuidePostsInfo;
-import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 
@@ -90,537 +45,43 @@ import com.google.common.collect.Lists;
  * 
  * @since 0.1
  */
-public class ParallelIterators extends ExplainTable implements ResultIterators {
+public class ParallelIterators extends BaseResultIterators {
 	private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class);
-    private final List<List<Scan>> scans;
-    private final List<KeyRange> splits;
-    private final PTableStats tableStats;
-    private final byte[] physicalTableName;
-    private final QueryPlan plan;
+	private static final String NAME = "PARALLEL";
     private final ParallelIteratorFactory iteratorFactory;
     
-    public static interface ParallelIteratorFactory {
-        PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
-    }
-
-    private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
-    private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
-
-    static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
-        @Override
-        public KeyRange apply(HRegionLocation region) {
-            return KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
-        }
-    };
-
-    private PTable getTable() {
-        return plan.getTableRef().getTable();
-    }
-    
-    private boolean useStats() {
-        Scan scan = context.getScan();
-        boolean isPointLookup = context.getScanRanges().isPointLookup();
-        /*
-         *  Don't use guide posts if:
-         *  1) We're doing a point lookup, as HBase is fast enough at those
-         *     to not need them to be further parallelized. TODO: pref test to verify
-         *  2) We're collecting stats, as in this case we need to scan entire
-         *     regions worth of data to track where to put the guide posts.
-         */
-        if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) {
-            return false;
-        }
-        return true;
-    }
-    
     public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
             throws SQLException {
-        super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint());
-        this.plan = plan;
-        StatementContext context = plan.getContext();
-        TableRef tableRef = plan.getTableRef();
-        PTable table = tableRef.getTable();
-        FilterableStatement statement = plan.getStatement();
-        RowProjector projector = plan.getProjector();
-        physicalTableName = table.getPhysicalName().getBytes();
-        tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
-        Scan scan = context.getScan();
-        if (projector.isProjectEmptyKeyValue()) {
-            Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
-            // If nothing projected into scan and we only have one column family, just allow everything
-            // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
-            // be quite a bit faster.
-            // Where condition columns also will get added into familyMap
-            // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
-            if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
-                    && table.getColumnFamilies().size() == 1) {
-                // Project the one column family. We must project a column family since it's possible
-                // that there are other non declared column families that we need to ignore.
-                scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
-                ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
-            } else {
-                byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
-                // Project empty key value unless the column family containing it has
-                // been projected in its entirety.
-                if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
-                    scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
-                }
-            }
-        } else if (table.getViewType() == ViewType.MAPPED) {
-            // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
-            // selected column values are returned back to client
-            for (PColumnFamily family : table.getColumnFamilies()) {
-                scan.addFamily(family.getName().getBytes());
-            }
-        }
-        
-        // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
-        if (perScanLimit != null) {
-            ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
-        }
-
-        doColumnProjectionOptimization(context, scan, table, statement);
-        
+        super(plan, perScanLimit);
         this.iteratorFactory = iteratorFactory;
-        this.scans = getParallelScans();
-        List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
-        for (List<Scan> scanList : scans) {
-            for (Scan aScan : scanList) {
-                splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
-            }
-        }
-        this.splits = ImmutableList.copyOf(splitRanges);
-    }
-
-    private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
-        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
-        if (familyMap != null && !familyMap.isEmpty()) {
-            // columnsTracker contain cf -> qualifiers which should get returned.
-            Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker = 
-                    new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
-            Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-            int referencedCfCount = familyMap.size();
-            for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
-                if (!(familyMap.containsKey(whereCol.getFirst()))) {
-                    referencedCfCount++;
-                }
-            }
-            boolean useOptimization;
-            if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
-                // Do not use the optimization
-                useOptimization = false;
-            } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
-                // Strictly use the optimization
-                useOptimization = true;
-            } else {
-                // when referencedCfCount is >1 and no Hints, we are not using the optimization
-                useOptimization = referencedCfCount == 1;
-            }
-            if (useOptimization) {
-                for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
-                    ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
-                    NavigableSet<byte[]> qs = entry.getValue();
-                    NavigableSet<ImmutableBytesPtr> cols = null;
-                    if (qs != null) {
-                        cols = new TreeSet<ImmutableBytesPtr>();
-                        for (byte[] q : qs) {
-                            cols.add(new ImmutableBytesPtr(q));
-                        }
-                    }
-                    columnsTracker.put(cf, cols);
-                }
-            }
-            // Making sure that where condition CFs are getting scanned at HRS.
-            for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
-                if (useOptimization) {
-                    if (!(familyMap.containsKey(whereCol.getFirst()))) {
-                        scan.addFamily(whereCol.getFirst());
-                        conditionOnlyCfs.add(whereCol.getFirst());
-                    }
-                } else {
-                    if (familyMap.containsKey(whereCol.getFirst())) {
-                        // where column's CF is present. If there are some specific columns added against this CF, we
-                        // need to ensure this where column also getting added in it.
-                        // If the select was like select cf1.*, then that itself will select the whole CF. So no need to
-                        // specifically add the where column. Adding that will remove the cf1.* stuff and only this
-                        // where condition column will get returned!
-                        NavigableSet<byte[]> cols = familyMap.get(whereCol.getFirst());
-                        // cols is null means the whole CF will get scanned.
-                        if (cols != null) {
-                            scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
-                        }
-                    } else {
-                        // where column's CF itself is not present in family map. We need to add the column
-                        scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
-                    }
-                }
-            }
-            if (useOptimization && !columnsTracker.isEmpty()) {
-                for (ImmutableBytesPtr f : columnsTracker.keySet()) {
-                    // This addFamily will remove explicit cols in scan familyMap and make it as entire row.
-                    // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
-                    scan.addFamily(f.get());
-                }
-                // We don't need this filter for aggregates, as we're not returning back what's
-                // in the scan in this case. We still want the other optimization that causes
-                // the ExplicitColumnTracker not to be used, though.
-                if (!(statement.isAggregate())) {
-                    ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
-                            columnsTracker, conditionOnlyCfs));
-                }
-            }
-        }
-    }
-
-    public List<KeyRange> getSplits() {
-        return splits;
-    }
-
-    public List<List<Scan>> getScans() {
-        return scans;
-    }
-
-    private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
-        int nBoundaries = regionLocations.size() - 1;
-        List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
-        for (int i = 0; i < nBoundaries; i++) {
-            HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
-            ranges.add(regionInfo.getEndKey());
-        }
-        return ranges;
-    }
-    
-    private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) {
-        int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR);
-        // If we found an exact match, return the index+1, as the inclusiveKey will be contained
-        // in the next region (since we're matching on the end boundary).
-        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
-        return guideIndex;
-    }
-    
-    private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) {
-        int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR);
-        // If we found an exact match, return the index we found as the exclusiveKey won't be
-        // contained in the next region as with getIndexContainingInclusive.
-        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
-        return guideIndex;
-    }
-    
-    private List<byte[]> getGuidePosts() {
-        /*
-         *  Don't use guide posts if:
-         *  1) We're doing a point lookup, as HBase is fast enough at those
-         *     to not need them to be further parallelized. TODO: pref test to verify
-         *  2) We're collecting stats, as in this case we need to scan entire
-         *     regions worth of data to track where to put the guide posts.
-         */
-        if (!useStats()) {
-            return Collections.emptyList();
-        }
-        
-        List<byte[]> gps = null;
-        PTable table = getTable();
-        Map<byte[],GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
-        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
-        if (table.getColumnFamilies().isEmpty()) {
-            // For sure we can get the defaultCF from the table
-            if (guidePostMap.get(defaultCF) != null) {
-                gps = guidePostMap.get(defaultCF).getGuidePosts();
-            }
-        } else {
-            Scan scan = context.getScan();
-            if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) {
-                // If default CF is not used in scan, use first CF referenced in scan
-                GuidePostsInfo guidePostsInfo = guidePostMap.get(scan.getFamilyMap().keySet().iterator().next());
-                if (guidePostsInfo != null) {
-                    gps = guidePostsInfo.getGuidePosts();
-                }
-            } else {
-                // Otherwise, favor use of default CF.
-                if (guidePostMap.get(defaultCF) != null) {
-                    gps = guidePostMap.get(defaultCF).getGuidePosts();
-                }
-            }
-        }
-        if (gps == null) {
-            return Collections.emptyList();
-        }
-        return gps;
-    }
-    
-    private static String toString(List<byte[]> gps) {
-        StringBuilder buf = new StringBuilder(gps.size() * 100);
-        buf.append("[");
-        for (int i = 0; i < gps.size(); i++) {
-            buf.append(Bytes.toStringBinary(gps.get(i)));
-            buf.append(",");
-            if (i > 0 && i < gps.size()-1 && (i % 10) == 0) {
-                buf.append("\n");
-            }
-        }
-        buf.setCharAt(buf.length()-1, ']');
-        return buf.toString();
-    }
-    
-    private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) {
-        PTable table = getTable();
-        boolean startNewScanList = false;
-        if (!plan.isRowKeyOrdered()) {
-            startNewScanList = true;
-        } else if (crossedRegionBoundary) {
-            if (table.getIndexType() == IndexType.LOCAL) {
-                startNewScanList = true;
-            } else if (table.getBucketNum() != null) {
-                startNewScanList = scans.isEmpty() ||
-                        ScanUtil.crossesPrefixBoundary(startKey,
-                                ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES), 
-                                SaltingUtil.NUM_SALTING_BYTES);
-            }
-        }
-        if (scan != null) {
-            scans.add(scan);
-        }
-        if (startNewScanList && !scans.isEmpty()) {
-            parallelScans.add(scans);
-            scans = Lists.newArrayListWithExpectedSize(1);
-        }
-        return scans;
-    }
-
-    private List<List<Scan>> getParallelScans() throws SQLException {
-        return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
     }
 
-    /**
-     * Compute the list of parallel scans to run for a given query. The inner scans
-     * may be concatenated together directly, while the other ones may need to be
-     * merge sorted, depending on the query.
-     * @return list of parallel scans to run for a given query.
-     * @throws SQLException
-     */
-    private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException {
-        Scan scan = context.getScan();
-        List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
-                .getAllTableRegions(physicalTableName);
-        
-        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
-        ScanRanges scanRanges = context.getScanRanges();
-        PTable table = getTable();
-        boolean isSalted = table.getBucketNum() != null;
-        boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
-        List<byte[]> gps = getGuidePosts();
-        if (logger.isDebugEnabled()) {
-            logger.debug("Guideposts: " + toString(gps));
-        }
-        boolean traverseAllRegions = isSalted || isLocalIndex;
-        if (!traverseAllRegions) {
-            byte[] scanStartRow = scan.getStartRow();
-            if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
-                startKey = scanStartRow;
-            }
-            byte[] scanStopRow = scan.getStopRow();
-            if (stopKey.length == 0 || Bytes.compareTo(scanStopRow, stopKey) < 0) {
-                stopKey = scanStopRow;
-            }
-        }
-        
-        int regionIndex = 0;
-        int stopIndex = regionBoundaries.size();
-        if (startKey.length > 0) {
-            regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
-        }
-        if (stopKey.length > 0) {
-            stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
-            if (isLocalIndex) {
-                stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
-            }
-        }
-        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
-        
-        byte[] currentKey = startKey;
-        int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey);
-        int gpsSize = gps.size();
-        int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
-        int keyOffset = 0;
-        List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
-        // Merge bisect with guideposts for all but the last region
-        while (regionIndex <= stopIndex) {
-            byte[] currentGuidePost, endKey, endRegionKey = EMPTY_BYTE_ARRAY;
-            if (regionIndex == stopIndex) {
-                endKey = stopKey;
-            } else {
-                endKey = regionBoundaries.get(regionIndex);
-            }
-            if (isLocalIndex) {
-                HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo();
-                endRegionKey = regionInfo.getEndKey();
-                keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
-            }
-            while (guideIndex < gpsSize
-                    && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
-                Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset, false);
-                scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false);
-                currentKey = currentGuidePost;
-                guideIndex++;
-            }
-            Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset, true);
-            if (isLocalIndex) {
-                if (newScan != null) {
-                    newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
-                } else if (!scans.isEmpty()) {
-                    scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
-                }
-            }
-            scans = addNewScan(parallelScans, scans, newScan, endKey, true);
-            currentKey = endKey;
-            regionIndex++;
-        }
-        if (!scans.isEmpty()) { // Add any remaining scans
-            parallelScans.add(scans);
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans,
-                    ScanUtil.getCustomAnnotations(scan)));
-        }
-        return parallelScans;
-    }
-
-    public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
-        if (!reverse) {
-            return list;
-        }
-        return Lists.reverse(list);
-    }
-    
-    private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) {
-        if (!concatIterators.isEmpty()) {
-            iterators.add(ConcatResultIterator.newConcatResultIterator(concatIterators));
-        }
-    }
-    /**
-     * Executes the scan in parallel across all regions, blocking until all scans are complete.
-     * @return the result iterators for the scan of each region
-     */
     @Override
-    public List<PeekingResultIterator> getIterators() throws SQLException {
-        boolean success = false;
-        boolean isReverse = ScanUtil.isReversed(context.getScan());
-        boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL;
-        final ConnectionQueryServices services = context.getConnection().getQueryServices();
-        ReadOnlyProps props = services.getProps();
-        int numSplits = size();
-        List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
-        List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
-        // TODO: what purpose does this scanID serve?
-        final UUID scanId = UUID.randomUUID();
-        try {
-            submitWork(scanId, scans, futures, splits.size());
-            int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
-            boolean clearedCache = false;
-            for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
-                List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
-                for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
-                    try {
-                        PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
-                        concatIterators.add(iterator);
-                    } catch (ExecutionException e) {
-                        try { // Rethrow as SQLException
-                            throw ServerUtil.parseServerException(e);
-                        } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
-                            List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
-                            if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
-                                services.clearTableRegionCache(physicalTableName);
-                                clearedCache = true;
-                            }
-                            // Resubmit just this portion of work again
-                            Scan oldScan = scanPair.getFirst();
-                            byte[] startKey = oldScan.getStartRow();
-                            byte[] endKey = oldScan.getStopRow();
-                            if (isLocalIndex) {
-                                endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY);
-                            }
-                            List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
-                            // Add any concatIterators that were successful so far
-                            // as we need these to be in order
-                            addConcatResultIterator(iterators, concatIterators);
-                            concatIterators = Collections.emptyList();
-                            submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
-                            for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
-                                for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
-                                    // Immediate do a get (not catching exception again) and then add the iterators we
-                                    // get back immediately. They'll be sorted as expected, since they're replacing the
-                                    // original one.
-                                    PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
-                                    iterators.add(iterator);
-                                }
-                            }
-                        }
-                    }
-                }
-                addConcatResultIterator(iterators, concatIterators);
-            }
-
-            success = true;
-            return iterators;
-        } catch (SQLException e) {
-            throw e;
-        } catch (Exception e) {
-            throw ServerUtil.parseServerException(e);
-        } finally {
-            if (!success) {
-                SQLCloseables.closeAllQuietly(iterators);
-                // Don't call cancel, as it causes the HConnection to get into a funk
-//                for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) {
-//                    future.getSecond().cancel(true);
-//                }
-            }
-        }
-    }
-    
-    private static final class ScanLocation {
-    	private final int outerListIndex;
-    	private final int innerListIndex;
-    	private final Scan scan;
-    	public ScanLocation(Scan scan, int outerListIndex, int innerListIndex) {
-    		this.outerListIndex = outerListIndex;
-    		this.innerListIndex = innerListIndex;
-    		this.scan = scan;
-    	}
-    	public int getOuterListIndex() {
-    		return outerListIndex;
-    	}
-    	public int getInnerListIndex() {
-    		return innerListIndex;
-    	}
-    	public Scan getScan() {
-    		return scan;
-    	}
-    }
-    private void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+    protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
             List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) {
-        final ConnectionQueryServices services = context.getConnection().getQueryServices();
-        ExecutorService executor = services.getExecutor();
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
         // will spray the scans across machines as opposed to targeting a
         // single one since the scans are in row key order.
-        List<ScanLocation> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
+        ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
+        List<ScanLocator> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
         for (int i = 0; i < nestedScans.size(); i++) {
             List<Scan> scans = nestedScans.get(i);
             List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size());
             nestedFutures.add(futures);
             for (int j = 0; j < scans.size(); j++) {
             	Scan scan = nestedScans.get(i).get(j);
-                scanLocations.add(new ScanLocation(scan, i, j));
+                scanLocations.add(new ScanLocator(scan, i, j));
                 futures.add(null); // placeholder
             }
         }
+        // Shuffle so that we start execution across many machines
+        // before we fill up the thread pool
         Collections.shuffle(scanLocations);
-        for (ScanLocation scanLocation : scanLocations) {
+        for (ScanLocator scanLocation : scanLocations) {
             final Scan scan = scanLocation.getScan();
-            Future<PeekingResultIterator> future =
-                executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
+            Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
 
                 @Override
                 public PeekingResultIterator call() throws Exception {
@@ -649,22 +110,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
     }
 
     @Override
-    public int size() {
-        return this.scans.size();
-    }
-
-    @Override
-    public void explain(List<String> planSteps) {
-        boolean displayChunkCount = context.getConnection().getQueryServices().getProps().getBoolean(
-                QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB,
-                QueryServicesOptions.DEFAULT_EXPLAIN_CHUNK_COUNT);
-        StringBuilder buf = new StringBuilder();
-        buf.append("CLIENT " + (displayChunkCount ? (this.splits.size() + "-CHUNK ") : "") + "PARALLEL " + size() + "-WAY ");
-        explain(buf.toString(),planSteps);
+    protected String getName() {
+        return NAME;
     }
-
-	@Override
-	public String toString() {
-		return "ParallelIterators [scans=" + scans + "]";
-	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
index 3051608..ef2b534 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
@@ -20,8 +20,13 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.query.KeyRange;
+
 public interface ResultIterators {
-    public List<PeekingResultIterator> getIterators() throws SQLException;
     public int size();
+    public List<KeyRange> getSplits();
+    public List<List<Scan>> getScans();
     public void explain(List<String> planSteps);
+    public List<PeekingResultIterator> getIterators() throws SQLException;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
new file mode 100644
index 0000000..5cb64a0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+
+/**
+ *
+ * Class that parallelizes the scan over a table using the ExecutorService provided.  Each region of the table will be scanned in parallel with
+ * the results accessible through {@link #getIterators()}
+ *
+ * 
+ * @since 0.1
+ */
+public class SerialIterators extends BaseResultIterators {
+	private static final Logger logger = LoggerFactory.getLogger(SerialIterators.class);
+	private static final String NAME = "SERIAL";
+    private final ParallelIteratorFactory iteratorFactory;
+    private final int limit;
+    
+    public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+            throws SQLException {
+        super(plan, perScanLimit);
+        Preconditions.checkArgument(perScanLimit != null); // must be a limit specified
+        this.iteratorFactory = iteratorFactory;
+        this.limit = perScanLimit;
+    }
+
+    @Override
+    protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+            List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) {
+        // Pre-populate nestedFutures lists so that we can shuffle the scans
+        // and add the future to the right nested list. By shuffling the scans
+        // we get better utilization of the cluster since our thread executor
+        // will spray the scans across machines as opposed to targeting a
+        // single one since the scans are in row key order.
+        ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
+        
+        for (final List<Scan> scans : nestedScans) {
+            Scan firstScan = scans.get(0);
+            Scan lastScan = scans.get(scans.size()-1);
+            final Scan overallScan = ScanUtil.newScan(firstScan);
+            overallScan.setStopRow(lastScan.getStopRow());
+            Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
+
+                @Override
+                public PeekingResultIterator call() throws Exception {
+                	List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size());
+                	for (final Scan scan : scans) {
+	                    long startTime = System.currentTimeMillis();
+	                    ResultIterator scanner = new TableResultIterator(context, tableRef, scan);
+	                    if (logger.isDebugEnabled()) {
+	                        logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
+	                    }
+	                    concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
+                	}
+                	PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
+                	return new LimitingPeekingResultIterator(concatIterator, limit);
+                }
+
+                /**
+                 * Defines the grouping for round robin behavior.  All threads spawned to process
+                 * this scan will be grouped together and time sliced with other simultaneously
+                 * executing parallel scans.
+                 */
+                @Override
+                public Object getJobId() {
+                    return SerialIterators.this;
+                }
+            }, "Parallel scanner for table: " + tableRef.getTable().getName().getString()));
+            // Add our singleton Future which will execute serially
+            nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future)));
+        }
+    }
+
+    @Override
+    protected String getName() {
+        return NAME;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 2a5080e..a343b48 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryServices;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 8f6d026..bf6de8a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -30,7 +30,7 @@ import org.apache.phoenix.compile.IndexStatementRewriter;
 import org.apache.phoenix.compile.QueryCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 4894b18..6c03780 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -149,6 +149,7 @@ public class QueryServicesOptions {
     public static final double DEFAULT_TRACING_PROBABILITY_THRESHOLD = 0.05;
 
     public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min
+    public static final int DEFAULT_STATS_GUIDEPOST_PER_REGION = 0; // Uses guidepost width by default
     public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 100 * 1024 *1024; // 100MB
     public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 90c8324..9c85e63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -25,11 +25,8 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -40,7 +37,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.ByteUtil;
@@ -74,21 +70,14 @@ public class StatisticsCollector {
 
     public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException {
         Configuration config = env.getConfiguration();
-        HTableInterface statsHTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
-        int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 0);
-        if (guidepostPerRegion > 0) {
-            long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize();
-            if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set...
-                maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
-            }
-            guidepostDepth = maxFileSize / guidepostPerRegion;
-        } else {
-            guidepostDepth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
-        }
+        int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 
+                QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
+        long guidepostWidth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
+        this.guidepostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, env.getRegion().getTableDesc());
         // Get the stats table associated with the current table on which the CP is
         // triggered
-        this.statsTable = StatisticsWriter.newWriter(statsHTable, tableName, clientTimeStamp);
+        this.statsTable = StatisticsWriter.newWriter(env, tableName, clientTimeStamp);
     }
     
     public long getMaxTimeStamp() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index d8ffd84..2a7047f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -24,6 +24,8 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -110,4 +112,19 @@ public class StatisticsUtil {
         }
         return PTableStats.EMPTY_STATS;
     }
+    
+    public static long getGuidePostDepth(int guidepostPerRegion, long guidepostWidth, HTableDescriptor tableDesc) {
+        if (guidepostPerRegion > 0) {
+            long maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
+            if (tableDesc != null) {
+                long tableMaxFileSize = tableDesc.getMaxFileSize();
+                if (tableMaxFileSize >= 0) {
+                    maxFileSize = tableMaxFileSize;
+                }
+            }
+            return maxFileSize / guidepostPerRegion;
+        } else {
+            return guidepostWidth;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 6681042..f70c327 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -26,11 +26,13 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
@@ -43,6 +45,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TimeKeeper;
 
 import com.google.protobuf.ServiceException;
@@ -63,24 +66,31 @@ public class StatisticsWriter implements Closeable {
      * @throws IOException
      *             if the table cannot be created due to an underlying HTable creation error
      */
-    public static StatisticsWriter newWriter(HTableInterface hTable, String tableName, long clientTimeStamp) throws IOException {
+    public static StatisticsWriter newWriter(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException {
         if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
             clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
         }
-        StatisticsWriter statsTable = new StatisticsWriter(hTable, tableName, clientTimeStamp);
+        HTableInterface statsWriterTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
+        HTableInterface statsReaderTable = ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable);
+        StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, statsWriterTable, tableName, clientTimeStamp);
         if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts yet
             statsTable.commitLastStatsUpdatedTime();
         }
         return statsTable;
     }
 
-    private final HTableInterface statisticsTable;
+    private final HTableInterface statsWriterTable;
+    // In HBase 0.98.4 or above, the reader and writer will be the same.
+    // In pre HBase 0.98.4, there was a bug in using the HTable returned
+    // from a coprocessor for scans, so in that case it'll be different.
+    private final HTableInterface statsReaderTable;
     private final byte[] tableName;
     private final long clientTimeStamp;
 
-    private StatisticsWriter(HTableInterface statsTable, String tableName, long clientTimeStamp) {
-        this.statisticsTable = statsTable;
-        this.tableName = PDataType.VARCHAR.toBytes(tableName);
+    private StatisticsWriter(HTableInterface statsReaderTable, HTableInterface statsWriterTable, String tableName, long clientTimeStamp) {
+        this.statsReaderTable = statsReaderTable;
+        this.statsWriterTable = statsWriterTable;
+        this.tableName = Bytes.toBytes(tableName);
         this.clientTimeStamp = clientTimeStamp;
     }
 
@@ -89,7 +99,7 @@ public class StatisticsWriter implements Closeable {
      */
     @Override
     public void close() throws IOException {
-        statisticsTable.close();
+        statsWriterTable.close();
     }
 
     public void splitStats(HRegion p, HRegion l, HRegion r, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
@@ -100,7 +110,7 @@ public class StatisticsWriter implements Closeable {
         }
         long readTimeStamp = useMaxTimeStamp ? HConstants.LATEST_TIMESTAMP : clientTimeStamp;
         byte[] famBytes = PDataType.VARCHAR.toBytes(fam);
-        Result result = StatisticsUtil.readRegionStatistics(statisticsTable, tableName, famBytes, p.getRegionName(), readTimeStamp);
+        Result result = StatisticsUtil.readRegionStatistics(statsReaderTable, tableName, famBytes, p.getRegionName(), readTimeStamp);
         if (result != null && !result.isEmpty()) {
         	Cell cell = result.getColumnLatestCell(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
 
@@ -211,7 +221,7 @@ public class StatisticsWriter implements Closeable {
                 mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(getMutationType(m), m));
             }
             MutateRowsRequest mrm = mrmBuilder.build();
-            CoprocessorRpcChannel channel = statisticsTable.coprocessorService(row);
+            CoprocessorRpcChannel channel = statsWriterTable.coprocessorService(row);
             MultiRowMutationService.BlockingInterface service =
                     MultiRowMutationService.newBlockingStub(channel);
             try {
@@ -235,7 +245,7 @@ public class StatisticsWriter implements Closeable {
         // Always use wallclock time for this, as it's a mechanism to prevent
         // stats from being collected too often.
         Put put = getLastStatsUpdatedTimePut(clientTimeStamp);
-        statisticsTable.put(put);
+        statsWriterTable.put(put);
     }
     
     public void deleteStats(byte[] regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index f1e625b..1ca246f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -33,6 +33,7 @@ import java.util.Properties;
 
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -69,6 +70,7 @@ import com.google.common.base.Preconditions;
  */
 public class SchemaUtil {
     private static final int VAR_LENGTH_ESTIMATE = 10;
+    private static final int VAR_KV_LENGTH_ESTIMATE = 50;
     public static final String ESCAPE_CHARACTER = "\"";
     public static final DataBlockEncoding DEFAULT_DATA_BLOCK_ENCODING = DataBlockEncoding.FAST_DIFF;
     public static final PDatum VAR_BINARY_DATUM = new PDatum() {
@@ -110,6 +112,28 @@ public class SchemaUtil {
     public static boolean isPKColumn(PColumn column) {
         return column.getFamilyName() == null;
     }
+  
+    /**
+     * Imperfect estimate of row size given a PTable
+     * TODO: keep row count in stats table and use total size / row count instead
+     * @param table
+     * @return estimate of size in bytes of a row
+     */
+    public static long estimateRowSize(PTable table) {
+    	int keyLength = estimateKeyLength(table);
+    	long rowSize = 0;
+    	for (PColumn column : table.getColumns()) {
+    		if (!SchemaUtil.isPKColumn(column)) {
+                PDataType type = column.getDataType();
+                Integer maxLength = column.getMaxLength();
+                int valueLength = !type.isFixedWidth() ? VAR_KV_LENGTH_ESTIMATE : maxLength == null ? type.getByteSize() : maxLength;
+    			rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, column.getFamilyName().getBytes().length, column.getName().getBytes().length, valueLength);
+    		}
+    	}
+    	// Empty key value
+    	rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, getEmptyColumnFamily(table).length, QueryConstants.EMPTY_COLUMN_BYTES.length, 0);
+    	return rowSize;
+    }
     
     /**
      * Estimate the max key length in bytes of the PK for a given table

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 3327dba..7205faa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
 
 
+@SuppressWarnings("deprecation")
 public class ServerUtil {
     private static final int COPROCESSOR_SCAN_WORKS = VersionUtil.encodeVersion("0.98.6");
     
@@ -133,19 +134,34 @@ public class ServerUtil {
         return null;
     }
 
-    @SuppressWarnings("deprecation")
-    public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, String tableName) throws IOException {
-        String versionString = env.getHBaseVersion();
-        int version = VersionUtil.encodeVersion(versionString);
-        if (version >= COPROCESSOR_SCAN_WORKS) {
-            // The following *should* work, but doesn't due to HBASE-11837 which was fixed in 0.98.6
-            return env.getTable(TableName.valueOf(tableName));
-        }
-        // This code works around HBASE-11837
+    private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment env) {
+        return (VersionUtil.encodeVersion(env.getHBaseVersion()) >= COPROCESSOR_SCAN_WORKS);
+    }
+    
+    /*
+     * This code works around HBASE-11837 which causes HTableInterfaces retrieved from
+     * RegionCoprocessorEnvironment to not read local data.
+     */
+    private static HTableInterface getTableFromSingletonPool(RegionCoprocessorEnvironment env, byte[] tableName) {
         // It's ok to not ever do a pool.close() as we're storing a single
         // table only. The HTablePool holds no other resources that this table
         // which will be closed itself when it's no longer needed.
+        @SuppressWarnings("resource")
         HTablePool pool = new HTablePool(env.getConfiguration(),1);
         return pool.getTable(tableName);
     }
+    
+    public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, HTableInterface writerTable) throws IOException {
+        if (coprocessorScanWorks(env)) {
+            return writerTable;
+        }
+        return getTableFromSingletonPool(env, writerTable.getTableName());
+    }
+    
+    public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, byte[] tableName) throws IOException {
+        if (coprocessorScanWorks(env)) {
+            return env.getTable(TableName.valueOf(tableName));
+        }
+        return getTableFromSingletonPool(env, tableName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
index 136a997..9dfcce5 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.expression.function.SumAggregateFunction;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PLongColumn;
 import org.apache.phoenix.schema.PName;
@@ -140,6 +141,16 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest {
             @Override
             public void explain(List<String> planSteps) {
             }
+
+			@Override
+			public List<KeyRange> getSplits() {
+				return Collections.emptyList();
+			}
+
+			@Override
+			public List<List<Scan>> getScans() {
+				return Collections.emptyList();
+			}
             
         };
         ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
index a135729..02fdcea 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
@@ -24,9 +24,10 @@ import java.sql.SQLException;
 import java.util.*;
 
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
-
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.AssertResults;
@@ -68,6 +69,15 @@ public class ConcatResultIteratorTest {
             public void explain(List<String> planSteps) {
             }
             
+			@Override
+			public List<KeyRange> getSplits() {
+				return Collections.emptyList();
+			}
+
+			@Override
+			public List<List<Scan>> getScans() {
+				return Collections.emptyList();
+			}
         };
 
         Tuple[] expectedResults = new Tuple[] {
@@ -118,6 +128,15 @@ public class ConcatResultIteratorTest {
             public void explain(List<String> planSteps) {
             }
             
+			@Override
+			public List<KeyRange> getSplits() {
+				return Collections.emptyList();
+			}
+
+			@Override
+			public List<List<Scan>> getScans() {
+				return Collections.emptyList();
+			}
         };
         ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
         AssertResults.assertResults(scanner, expectedResults);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
index 9ff088e..095027c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
@@ -24,9 +24,10 @@ import java.sql.SQLException;
 import java.util.*;
 
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
-
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.AssertResults;
@@ -73,6 +74,15 @@ public class MergeSortResultIteratorTest {
             public void explain(List<String> planSteps) {
             }
             
+			@Override
+			public List<KeyRange> getSplits() {
+				return Collections.emptyList();
+			}
+
+			@Override
+			public List<List<Scan>> getScans() {
+				return Collections.emptyList();
+			}
         };
         ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
         AssertResults.assertResults(scanner, expectedResults);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
index 1e3df0b..6139aa5 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
@@ -25,6 +25,7 @@ import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.Properties;
 
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.junit.Test;
 
@@ -183,5 +184,55 @@ public class QueryPlanTest extends BaseConnectionlessQueryTest {
             }
         }
     }
+    
+    @Test
+    public void testTenantSpecificConnWithLimit() throws Exception {
+        String baseTableDDL = "CREATE TABLE BASE_MULTI_TENANT_TABLE(\n " + 
+                "  tenant_id VARCHAR(5) NOT NULL,\n" + 
+                "  userid INTEGER NOT NULL,\n" + 
+                "  username VARCHAR NOT NULL,\n" +
+                "  col VARCHAR\n " + 
+                "  CONSTRAINT pk PRIMARY KEY (tenant_id, userid, username)) MULTI_TENANT=true";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(baseTableDDL);
+        conn.close();
+        
+        String tenantId = "tenantId";
+        String tenantViewDDL = "CREATE VIEW TENANT_VIEW AS SELECT * FROM BASE_MULTI_TENANT_TABLE";
+        Properties tenantProps = new Properties();
+        tenantProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        conn = DriverManager.getConnection(getUrl(), tenantProps);
+        conn.createStatement().execute(tenantViewDDL);
+        
+        String query = "EXPLAIN SELECT * FROM TENANT_VIEW LIMIT 1";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        assertEquals("CLIENT SERIAL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + 
+                "    SERVER FILTER BY PageFilter 1\n" + 
+                "    SERVER 1 ROW LIMIT\n" + 
+                "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+        query = "EXPLAIN SELECT * FROM TENANT_VIEW LIMIT " + Integer.MAX_VALUE;
+        rs = conn.createStatement().executeQuery(query);
+        assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + 
+                "    SERVER FILTER BY PageFilter " + Integer.MAX_VALUE + "\n" + 
+                "    SERVER " + Integer.MAX_VALUE + " ROW LIMIT\n" + 
+                "CLIENT " + Integer.MAX_VALUE + " ROW LIMIT", QueryUtil.getExplainPlan(rs));
+        query = "EXPLAIN SELECT * FROM TENANT_VIEW WHERE username = 'Joe' LIMIT 1";
+        rs = conn.createStatement().executeQuery(query);
+        assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + 
+                "    SERVER FILTER BY USERNAME = 'Joe'\n" + 
+                "    SERVER 1 ROW LIMIT\n" + 
+                "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+        query = "EXPLAIN SELECT * FROM TENANT_VIEW WHERE col = 'Joe' LIMIT 1";
+        rs = conn.createStatement().executeQuery(query);
+        assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + 
+                "    SERVER FILTER BY COL = 'Joe'\n" + 
+                "    SERVER 1 ROW LIMIT\n" + 
+                "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+    }
+    
+    @Test
+    public void testLimitOnTenantSpecific() throws Exception {
+        
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbd3d625/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
index 2bff620..f6808a8 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
@@ -105,7 +105,7 @@ public final class PhoenixRecordReader extends RecordReader<NullWritable,Phoenix
                 PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }
-            ResultIterator iterator = ConcatResultIterator.newConcatResultIterator(iterators);
+            ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
             if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
                 iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
             }