You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/13 06:55:27 UTC

[2/9] phoenix git commit: PHOENIX-1432 Run limit query that has only leading PK column filter serially

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 7905d34..bde3f78 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -17,68 +17,23 @@
  */
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
-import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
-
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.filter.ColumnProjectionFilter;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.job.JobManager.JobCallable;
-import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.HintNode.Hint;
-import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.PColumnFamily;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.schema.SaltingUtil;
-import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.stats.GuidePostsInfo;
-import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 
@@ -90,537 +45,43 @@ import com.google.common.collect.Lists;
  * 
  * @since 0.1
  */
-public class ParallelIterators extends ExplainTable implements ResultIterators {
+public class ParallelIterators extends BaseResultIterators {
 	private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class);
-    private final List<List<Scan>> scans;
-    private final List<KeyRange> splits;
-    private final PTableStats tableStats;
-    private final byte[] physicalTableName;
-    private final QueryPlan plan;
+	private static final String NAME = "PARALLEL";
     private final ParallelIteratorFactory iteratorFactory;
     
-    public static interface ParallelIteratorFactory {
-        PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
-    }
-
-    private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
-    private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
-
-    static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
-        @Override
-        public KeyRange apply(HRegionLocation region) {
-            return KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
-        }
-    };
-
-    private PTable getTable() {
-        return plan.getTableRef().getTable();
-    }
-    
-    private boolean useStats() {
-        Scan scan = context.getScan();
-        boolean isPointLookup = context.getScanRanges().isPointLookup();
-        /*
-         *  Don't use guide posts if:
-         *  1) We're doing a point lookup, as HBase is fast enough at those
-         *     to not need them to be further parallelized. TODO: pref test to verify
-         *  2) We're collecting stats, as in this case we need to scan entire
-         *     regions worth of data to track where to put the guide posts.
-         */
-        if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) {
-            return false;
-        }
-        return true;
-    }
-    
     public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
             throws SQLException {
-        super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint());
-        this.plan = plan;
-        StatementContext context = plan.getContext();
-        TableRef tableRef = plan.getTableRef();
-        PTable table = tableRef.getTable();
-        FilterableStatement statement = plan.getStatement();
-        RowProjector projector = plan.getProjector();
-        physicalTableName = table.getPhysicalName().getBytes();
-        tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
-        Scan scan = context.getScan();
-        if (projector.isProjectEmptyKeyValue()) {
-            Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
-            // If nothing projected into scan and we only have one column family, just allow everything
-            // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
-            // be quite a bit faster.
-            // Where condition columns also will get added into familyMap
-            // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
-            if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
-                    && table.getColumnFamilies().size() == 1) {
-                // Project the one column family. We must project a column family since it's possible
-                // that there are other non declared column families that we need to ignore.
-                scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
-                ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
-            } else {
-                byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
-                // Project empty key value unless the column family containing it has
-                // been projected in its entirety.
-                if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
-                    scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
-                }
-            }
-        } else if (table.getViewType() == ViewType.MAPPED) {
-            // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
-            // selected column values are returned back to client
-            for (PColumnFamily family : table.getColumnFamilies()) {
-                scan.addFamily(family.getName().getBytes());
-            }
-        }
-        
-        // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
-        if (perScanLimit != null) {
-            ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
-        }
-
-        doColumnProjectionOptimization(context, scan, table, statement);
-        
+        super(plan, perScanLimit);
         this.iteratorFactory = iteratorFactory;
-        this.scans = getParallelScans();
-        List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
-        for (List<Scan> scanList : scans) {
-            for (Scan aScan : scanList) {
-                splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
-            }
-        }
-        this.splits = ImmutableList.copyOf(splitRanges);
-    }
-
-    private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
-        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
-        if (familyMap != null && !familyMap.isEmpty()) {
-            // columnsTracker contain cf -> qualifiers which should get returned.
-            Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker = 
-                    new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
-            Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-            int referencedCfCount = familyMap.size();
-            for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
-                if (!(familyMap.containsKey(whereCol.getFirst()))) {
-                    referencedCfCount++;
-                }
-            }
-            boolean useOptimization;
-            if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
-                // Do not use the optimization
-                useOptimization = false;
-            } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
-                // Strictly use the optimization
-                useOptimization = true;
-            } else {
-                // when referencedCfCount is >1 and no Hints, we are not using the optimization
-                useOptimization = referencedCfCount == 1;
-            }
-            if (useOptimization) {
-                for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
-                    ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
-                    NavigableSet<byte[]> qs = entry.getValue();
-                    NavigableSet<ImmutableBytesPtr> cols = null;
-                    if (qs != null) {
-                        cols = new TreeSet<ImmutableBytesPtr>();
-                        for (byte[] q : qs) {
-                            cols.add(new ImmutableBytesPtr(q));
-                        }
-                    }
-                    columnsTracker.put(cf, cols);
-                }
-            }
-            // Making sure that where condition CFs are getting scanned at HRS.
-            for (Pair<byte[], byte[]> whereCol : context.getWhereCoditionColumns()) {
-                if (useOptimization) {
-                    if (!(familyMap.containsKey(whereCol.getFirst()))) {
-                        scan.addFamily(whereCol.getFirst());
-                        conditionOnlyCfs.add(whereCol.getFirst());
-                    }
-                } else {
-                    if (familyMap.containsKey(whereCol.getFirst())) {
-                        // where column's CF is present. If there are some specific columns added against this CF, we
-                        // need to ensure this where column also getting added in it.
-                        // If the select was like select cf1.*, then that itself will select the whole CF. So no need to
-                        // specifically add the where column. Adding that will remove the cf1.* stuff and only this
-                        // where condition column will get returned!
-                        NavigableSet<byte[]> cols = familyMap.get(whereCol.getFirst());
-                        // cols is null means the whole CF will get scanned.
-                        if (cols != null) {
-                            scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
-                        }
-                    } else {
-                        // where column's CF itself is not present in family map. We need to add the column
-                        scan.addColumn(whereCol.getFirst(), whereCol.getSecond());
-                    }
-                }
-            }
-            if (useOptimization && !columnsTracker.isEmpty()) {
-                for (ImmutableBytesPtr f : columnsTracker.keySet()) {
-                    // This addFamily will remove explicit cols in scan familyMap and make it as entire row.
-                    // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
-                    scan.addFamily(f.get());
-                }
-                // We don't need this filter for aggregates, as we're not returning back what's
-                // in the scan in this case. We still want the other optimization that causes
-                // the ExplicitColumnTracker not to be used, though.
-                if (!(statement.isAggregate())) {
-                    ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
-                            columnsTracker, conditionOnlyCfs));
-                }
-            }
-        }
-    }
-
-    public List<KeyRange> getSplits() {
-        return splits;
-    }
-
-    public List<List<Scan>> getScans() {
-        return scans;
-    }
-
-    private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) {
-        int nBoundaries = regionLocations.size() - 1;
-        List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
-        for (int i = 0; i < nBoundaries; i++) {
-            HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo();
-            ranges.add(regionInfo.getEndKey());
-        }
-        return ranges;
-    }
-    
-    private static int getIndexContainingInclusive(List<byte[]> boundaries, byte[] inclusiveKey) {
-        int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR);
-        // If we found an exact match, return the index+1, as the inclusiveKey will be contained
-        // in the next region (since we're matching on the end boundary).
-        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
-        return guideIndex;
-    }
-    
-    private static int getIndexContainingExclusive(List<byte[]> boundaries, byte[] exclusiveKey) {
-        int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR);
-        // If we found an exact match, return the index we found as the exclusiveKey won't be
-        // contained in the next region as with getIndexContainingInclusive.
-        guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex);
-        return guideIndex;
-    }
-    
-    private List<byte[]> getGuidePosts() {
-        /*
-         *  Don't use guide posts if:
-         *  1) We're doing a point lookup, as HBase is fast enough at those
-         *     to not need them to be further parallelized. TODO: pref test to verify
-         *  2) We're collecting stats, as in this case we need to scan entire
-         *     regions worth of data to track where to put the guide posts.
-         */
-        if (!useStats()) {
-            return Collections.emptyList();
-        }
-        
-        List<byte[]> gps = null;
-        PTable table = getTable();
-        Map<byte[],GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
-        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
-        if (table.getColumnFamilies().isEmpty()) {
-            // For sure we can get the defaultCF from the table
-            if (guidePostMap.get(defaultCF) != null) {
-                gps = guidePostMap.get(defaultCF).getGuidePosts();
-            }
-        } else {
-            Scan scan = context.getScan();
-            if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) {
-                // If default CF is not used in scan, use first CF referenced in scan
-                GuidePostsInfo guidePostsInfo = guidePostMap.get(scan.getFamilyMap().keySet().iterator().next());
-                if (guidePostsInfo != null) {
-                    gps = guidePostsInfo.getGuidePosts();
-                }
-            } else {
-                // Otherwise, favor use of default CF.
-                if (guidePostMap.get(defaultCF) != null) {
-                    gps = guidePostMap.get(defaultCF).getGuidePosts();
-                }
-            }
-        }
-        if (gps == null) {
-            return Collections.emptyList();
-        }
-        return gps;
-    }
-    
-    private static String toString(List<byte[]> gps) {
-        StringBuilder buf = new StringBuilder(gps.size() * 100);
-        buf.append("[");
-        for (int i = 0; i < gps.size(); i++) {
-            buf.append(Bytes.toStringBinary(gps.get(i)));
-            buf.append(",");
-            if (i > 0 && i < gps.size()-1 && (i % 10) == 0) {
-                buf.append("\n");
-            }
-        }
-        buf.setCharAt(buf.length()-1, ']');
-        return buf.toString();
-    }
-    
-    private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) {
-        PTable table = getTable();
-        boolean startNewScanList = false;
-        if (!plan.isRowKeyOrdered()) {
-            startNewScanList = true;
-        } else if (crossedRegionBoundary) {
-            if (table.getIndexType() == IndexType.LOCAL) {
-                startNewScanList = true;
-            } else if (table.getBucketNum() != null) {
-                startNewScanList = scans.isEmpty() ||
-                        ScanUtil.crossesPrefixBoundary(startKey,
-                                ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES), 
-                                SaltingUtil.NUM_SALTING_BYTES);
-            }
-        }
-        if (scan != null) {
-            scans.add(scan);
-        }
-        if (startNewScanList && !scans.isEmpty()) {
-            parallelScans.add(scans);
-            scans = Lists.newArrayListWithExpectedSize(1);
-        }
-        return scans;
-    }
-
-    private List<List<Scan>> getParallelScans() throws SQLException {
-        return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
     }
 
-    /**
-     * Compute the list of parallel scans to run for a given query. The inner scans
-     * may be concatenated together directly, while the other ones may need to be
-     * merge sorted, depending on the query.
-     * @return list of parallel scans to run for a given query.
-     * @throws SQLException
-     */
-    private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException {
-        Scan scan = context.getScan();
-        List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
-                .getAllTableRegions(physicalTableName);
-        
-        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
-        ScanRanges scanRanges = context.getScanRanges();
-        PTable table = getTable();
-        boolean isSalted = table.getBucketNum() != null;
-        boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
-        List<byte[]> gps = getGuidePosts();
-        if (logger.isDebugEnabled()) {
-            logger.debug("Guideposts: " + toString(gps));
-        }
-        boolean traverseAllRegions = isSalted || isLocalIndex;
-        if (!traverseAllRegions) {
-            byte[] scanStartRow = scan.getStartRow();
-            if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
-                startKey = scanStartRow;
-            }
-            byte[] scanStopRow = scan.getStopRow();
-            if (stopKey.length == 0 || Bytes.compareTo(scanStopRow, stopKey) < 0) {
-                stopKey = scanStopRow;
-            }
-        }
-        
-        int regionIndex = 0;
-        int stopIndex = regionBoundaries.size();
-        if (startKey.length > 0) {
-            regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
-        }
-        if (stopKey.length > 0) {
-            stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
-            if (isLocalIndex) {
-                stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
-            }
-        }
-        List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
-        
-        byte[] currentKey = startKey;
-        int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey);
-        int gpsSize = gps.size();
-        int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1;
-        int keyOffset = 0;
-        List<Scan> scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion);
-        // Merge bisect with guideposts for all but the last region
-        while (regionIndex <= stopIndex) {
-            byte[] currentGuidePost, endKey, endRegionKey = EMPTY_BYTE_ARRAY;
-            if (regionIndex == stopIndex) {
-                endKey = stopKey;
-            } else {
-                endKey = regionBoundaries.get(regionIndex);
-            }
-            if (isLocalIndex) {
-                HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo();
-                endRegionKey = regionInfo.getEndKey();
-                keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
-            }
-            while (guideIndex < gpsSize
-                    && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
-                Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset, false);
-                scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false);
-                currentKey = currentGuidePost;
-                guideIndex++;
-            }
-            Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset, true);
-            if (isLocalIndex) {
-                if (newScan != null) {
-                    newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
-                } else if (!scans.isEmpty()) {
-                    scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
-                }
-            }
-            scans = addNewScan(parallelScans, scans, newScan, endKey, true);
-            currentKey = endKey;
-            regionIndex++;
-        }
-        if (!scans.isEmpty()) { // Add any remaining scans
-            parallelScans.add(scans);
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans,
-                    ScanUtil.getCustomAnnotations(scan)));
-        }
-        return parallelScans;
-    }
-
-    public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
-        if (!reverse) {
-            return list;
-        }
-        return Lists.reverse(list);
-    }
-    
-    private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) {
-        if (!concatIterators.isEmpty()) {
-            iterators.add(ConcatResultIterator.newConcatResultIterator(concatIterators));
-        }
-    }
-    /**
-     * Executes the scan in parallel across all regions, blocking until all scans are complete.
-     * @return the result iterators for the scan of each region
-     */
     @Override
-    public List<PeekingResultIterator> getIterators() throws SQLException {
-        boolean success = false;
-        boolean isReverse = ScanUtil.isReversed(context.getScan());
-        boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL;
-        final ConnectionQueryServices services = context.getConnection().getQueryServices();
-        ReadOnlyProps props = services.getProps();
-        int numSplits = size();
-        List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
-        List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numSplits);
-        // TODO: what purpose does this scanID serve?
-        final UUID scanId = UUID.randomUUID();
-        try {
-            submitWork(scanId, scans, futures, splits.size());
-            int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
-            boolean clearedCache = false;
-            for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
-                List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
-                for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
-                    try {
-                        PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
-                        concatIterators.add(iterator);
-                    } catch (ExecutionException e) {
-                        try { // Rethrow as SQLException
-                            throw ServerUtil.parseServerException(e);
-                        } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
-                            List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
-                            if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
-                                services.clearTableRegionCache(physicalTableName);
-                                clearedCache = true;
-                            }
-                            // Resubmit just this portion of work again
-                            Scan oldScan = scanPair.getFirst();
-                            byte[] startKey = oldScan.getStartRow();
-                            byte[] endKey = oldScan.getStopRow();
-                            if (isLocalIndex) {
-                                endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY);
-                            }
-                            List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
-                            // Add any concatIterators that were successful so far
-                            // as we need these to be in order
-                            addConcatResultIterator(iterators, concatIterators);
-                            concatIterators = Collections.emptyList();
-                            submitWork(scanId, newNestedScans, newFutures, newNestedScans.size());
-                            for (List<Pair<Scan,Future<PeekingResultIterator>>> newFuture : reverseIfNecessary(newFutures, isReverse)) {
-                                for (Pair<Scan,Future<PeekingResultIterator>> newScanPair : reverseIfNecessary(newFuture, isReverse)) {
-                                    // Immediate do a get (not catching exception again) and then add the iterators we
-                                    // get back immediately. They'll be sorted as expected, since they're replacing the
-                                    // original one.
-                                    PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
-                                    iterators.add(iterator);
-                                }
-                            }
-                        }
-                    }
-                }
-                addConcatResultIterator(iterators, concatIterators);
-            }
-
-            success = true;
-            return iterators;
-        } catch (SQLException e) {
-            throw e;
-        } catch (Exception e) {
-            throw ServerUtil.parseServerException(e);
-        } finally {
-            if (!success) {
-                SQLCloseables.closeAllQuietly(iterators);
-                // Don't call cancel, as it causes the HConnection to get into a funk
-//                for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) {
-//                    future.getSecond().cancel(true);
-//                }
-            }
-        }
-    }
-    
-    private static final class ScanLocation {
-    	private final int outerListIndex;
-    	private final int innerListIndex;
-    	private final Scan scan;
-    	public ScanLocation(Scan scan, int outerListIndex, int innerListIndex) {
-    		this.outerListIndex = outerListIndex;
-    		this.innerListIndex = innerListIndex;
-    		this.scan = scan;
-    	}
-    	public int getOuterListIndex() {
-    		return outerListIndex;
-    	}
-    	public int getInnerListIndex() {
-    		return innerListIndex;
-    	}
-    	public Scan getScan() {
-    		return scan;
-    	}
-    }
-    private void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+    protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
             List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) {
-        final ConnectionQueryServices services = context.getConnection().getQueryServices();
-        ExecutorService executor = services.getExecutor();
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
         // will spray the scans across machines as opposed to targeting a
         // single one since the scans are in row key order.
-        List<ScanLocation> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
+        ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
+        List<ScanLocator> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
         for (int i = 0; i < nestedScans.size(); i++) {
             List<Scan> scans = nestedScans.get(i);
             List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size());
             nestedFutures.add(futures);
             for (int j = 0; j < scans.size(); j++) {
             	Scan scan = nestedScans.get(i).get(j);
-                scanLocations.add(new ScanLocation(scan, i, j));
+                scanLocations.add(new ScanLocator(scan, i, j));
                 futures.add(null); // placeholder
             }
         }
+        // Shuffle so that we start execution across many machines
+        // before we fill up the thread pool
         Collections.shuffle(scanLocations);
-        for (ScanLocation scanLocation : scanLocations) {
+        for (ScanLocator scanLocation : scanLocations) {
             final Scan scan = scanLocation.getScan();
-            Future<PeekingResultIterator> future =
-                executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
+            Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
 
                 @Override
                 public PeekingResultIterator call() throws Exception {
@@ -649,22 +110,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators {
     }
 
     @Override
-    public int size() {
-        return this.scans.size();
-    }
-
-    @Override
-    public void explain(List<String> planSteps) {
-        boolean displayChunkCount = context.getConnection().getQueryServices().getProps().getBoolean(
-                QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB,
-                QueryServicesOptions.DEFAULT_EXPLAIN_CHUNK_COUNT);
-        StringBuilder buf = new StringBuilder();
-        buf.append("CLIENT " + (displayChunkCount ? (this.splits.size() + "-CHUNK ") : "") + "PARALLEL " + size() + "-WAY ");
-        explain(buf.toString(),planSteps);
+    protected String getName() {
+        return NAME;
     }
-
-	@Override
-	public String toString() {
-		return "ParallelIterators [scans=" + scans + "]";
-	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
index 3051608..ef2b534 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
@@ -20,8 +20,13 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.query.KeyRange;
+
 public interface ResultIterators {
-    public List<PeekingResultIterator> getIterators() throws SQLException;
     public int size();
+    public List<KeyRange> getSplits();
+    public List<List<Scan>> getScans();
     public void explain(List<String> planSteps);
+    public List<PeekingResultIterator> getIterators() throws SQLException;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
new file mode 100644
index 0000000..5cb64a0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+
+/**
+ *
+ * Class that parallelizes the scan over a table using the ExecutorService provided.  Each region of the table will be scanned in parallel with
+ * the results accessible through {@link #getIterators()}
+ *
+ * 
+ * @since 0.1
+ */
+public class SerialIterators extends BaseResultIterators {
+	private static final Logger logger = LoggerFactory.getLogger(SerialIterators.class);
+	private static final String NAME = "SERIAL";
+    private final ParallelIteratorFactory iteratorFactory;
+    private final int limit;
+    
+    public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+            throws SQLException {
+        super(plan, perScanLimit);
+        Preconditions.checkArgument(perScanLimit != null); // must be a limit specified
+        this.iteratorFactory = iteratorFactory;
+        this.limit = perScanLimit;
+    }
+
+    @Override
+    protected void submitWork(final UUID scanId, List<List<Scan>> nestedScans,
+            List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, int estFlattenedSize) {
+        // Pre-populate nestedFutures lists so that we can shuffle the scans
+        // and add the future to the right nested list. By shuffling the scans
+        // we get better utilization of the cluster since our thread executor
+        // will spray the scans across machines as opposed to targeting a
+        // single one since the scans are in row key order.
+        ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
+        
+        for (final List<Scan> scans : nestedScans) {
+            Scan firstScan = scans.get(0);
+            Scan lastScan = scans.get(scans.size()-1);
+            final Scan overallScan = ScanUtil.newScan(firstScan);
+            overallScan.setStopRow(lastScan.getStopRow());
+            Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
+
+                @Override
+                public PeekingResultIterator call() throws Exception {
+                	List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size());
+                	for (final Scan scan : scans) {
+	                    long startTime = System.currentTimeMillis();
+	                    ResultIterator scanner = new TableResultIterator(context, tableRef, scan);
+	                    if (logger.isDebugEnabled()) {
+	                        logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
+	                    }
+	                    concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
+                	}
+                	PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
+                	return new LimitingPeekingResultIterator(concatIterator, limit);
+                }
+
+                /**
+                 * Defines the grouping for round robin behavior.  All threads spawned to process
+                 * this scan will be grouped together and time sliced with other simultaneously
+                 * executing parallel scans.
+                 */
+                @Override
+                public Object getJobId() {
+                    return SerialIterators.this;
+                }
+            }, "Parallel scanner for table: " + tableRef.getTable().getName().getString()));
+            // Add our singleton Future which will execute serially
+            nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future)));
+        }
+    }
+
+    @Override
+    protected String getName() {
+        return NAME;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 2a5080e..a343b48 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryServices;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 8f6d026..bf6de8a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -30,7 +30,7 @@ import org.apache.phoenix.compile.IndexStatementRewriter;
 import org.apache.phoenix.compile.QueryCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 4894b18..6c03780 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -149,6 +149,7 @@ public class QueryServicesOptions {
     public static final double DEFAULT_TRACING_PROBABILITY_THRESHOLD = 0.05;
 
     public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min
+    public static final int DEFAULT_STATS_GUIDEPOST_PER_REGION = 0; // Uses guidepost width by default
     public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 100 * 1024 *1024; // 100MB
     public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 90c8324..9c85e63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -25,11 +25,8 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -40,7 +37,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.ByteUtil;
@@ -74,21 +70,14 @@ public class StatisticsCollector {
 
     public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException {
         Configuration config = env.getConfiguration();
-        HTableInterface statsHTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
-        int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 0);
-        if (guidepostPerRegion > 0) {
-            long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize();
-            if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set...
-                maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
-            }
-            guidepostDepth = maxFileSize / guidepostPerRegion;
-        } else {
-            guidepostDepth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
-        }
+        int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 
+                QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
+        long guidepostWidth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
+        this.guidepostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, env.getRegion().getTableDesc());
         // Get the stats table associated with the current table on which the CP is
         // triggered
-        this.statsTable = StatisticsWriter.newWriter(statsHTable, tableName, clientTimeStamp);
+        this.statsTable = StatisticsWriter.newWriter(env, tableName, clientTimeStamp);
     }
     
     public long getMaxTimeStamp() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index d8ffd84..2a7047f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -24,6 +24,8 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -110,4 +112,19 @@ public class StatisticsUtil {
         }
         return PTableStats.EMPTY_STATS;
     }
+    
+    public static long getGuidePostDepth(int guidepostPerRegion, long guidepostWidth, HTableDescriptor tableDesc) {
+        if (guidepostPerRegion > 0) {
+            long maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
+            if (tableDesc != null) {
+                long tableMaxFileSize = tableDesc.getMaxFileSize();
+                if (tableMaxFileSize >= 0) {
+                    maxFileSize = tableMaxFileSize;
+                }
+            }
+            return maxFileSize / guidepostPerRegion;
+        } else {
+            return guidepostWidth;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 6681042..f70c327 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -26,11 +26,13 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
@@ -43,6 +45,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TimeKeeper;
 
 import com.google.protobuf.ServiceException;
@@ -63,24 +66,31 @@ public class StatisticsWriter implements Closeable {
      * @throws IOException
      *             if the table cannot be created due to an underlying HTable creation error
      */
-    public static StatisticsWriter newWriter(HTableInterface hTable, String tableName, long clientTimeStamp) throws IOException {
+    public static StatisticsWriter newWriter(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException {
         if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
             clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
         }
-        StatisticsWriter statsTable = new StatisticsWriter(hTable, tableName, clientTimeStamp);
+        HTableInterface statsWriterTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
+        HTableInterface statsReaderTable = ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable);
+        StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, statsWriterTable, tableName, clientTimeStamp);
         if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts yet
             statsTable.commitLastStatsUpdatedTime();
         }
         return statsTable;
     }
 
-    private final HTableInterface statisticsTable;
+    private final HTableInterface statsWriterTable;
+    // In HBase 0.98.4 or above, the reader and writer will be the same.
+    // In pre HBase 0.98.4, there was a bug in using the HTable returned
+    // from a coprocessor for scans, so in that case it'll be different.
+    private final HTableInterface statsReaderTable;
     private final byte[] tableName;
     private final long clientTimeStamp;
 
-    private StatisticsWriter(HTableInterface statsTable, String tableName, long clientTimeStamp) {
-        this.statisticsTable = statsTable;
-        this.tableName = PDataType.VARCHAR.toBytes(tableName);
+    private StatisticsWriter(HTableInterface statsReaderTable, HTableInterface statsWriterTable, String tableName, long clientTimeStamp) {
+        this.statsReaderTable = statsReaderTable;
+        this.statsWriterTable = statsWriterTable;
+        this.tableName = Bytes.toBytes(tableName);
         this.clientTimeStamp = clientTimeStamp;
     }
 
@@ -89,7 +99,7 @@ public class StatisticsWriter implements Closeable {
      */
     @Override
     public void close() throws IOException {
-        statisticsTable.close();
+        statsWriterTable.close();
     }
 
     public void splitStats(HRegion p, HRegion l, HRegion r, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
@@ -100,7 +110,7 @@ public class StatisticsWriter implements Closeable {
         }
         long readTimeStamp = useMaxTimeStamp ? HConstants.LATEST_TIMESTAMP : clientTimeStamp;
         byte[] famBytes = PDataType.VARCHAR.toBytes(fam);
-        Result result = StatisticsUtil.readRegionStatistics(statisticsTable, tableName, famBytes, p.getRegionName(), readTimeStamp);
+        Result result = StatisticsUtil.readRegionStatistics(statsReaderTable, tableName, famBytes, p.getRegionName(), readTimeStamp);
         if (result != null && !result.isEmpty()) {
         	Cell cell = result.getColumnLatestCell(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
 
@@ -211,7 +221,7 @@ public class StatisticsWriter implements Closeable {
                 mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(getMutationType(m), m));
             }
             MutateRowsRequest mrm = mrmBuilder.build();
-            CoprocessorRpcChannel channel = statisticsTable.coprocessorService(row);
+            CoprocessorRpcChannel channel = statsWriterTable.coprocessorService(row);
             MultiRowMutationService.BlockingInterface service =
                     MultiRowMutationService.newBlockingStub(channel);
             try {
@@ -235,7 +245,7 @@ public class StatisticsWriter implements Closeable {
         // Always use wallclock time for this, as it's a mechanism to prevent
         // stats from being collected too often.
         Put put = getLastStatsUpdatedTimePut(clientTimeStamp);
-        statisticsTable.put(put);
+        statsWriterTable.put(put);
     }
     
     public void deleteStats(byte[] regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index f1e625b..1ca246f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -33,6 +33,7 @@ import java.util.Properties;
 
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -69,6 +70,7 @@ import com.google.common.base.Preconditions;
  */
 public class SchemaUtil {
     private static final int VAR_LENGTH_ESTIMATE = 10;
+    private static final int VAR_KV_LENGTH_ESTIMATE = 50;
     public static final String ESCAPE_CHARACTER = "\"";
     public static final DataBlockEncoding DEFAULT_DATA_BLOCK_ENCODING = DataBlockEncoding.FAST_DIFF;
     public static final PDatum VAR_BINARY_DATUM = new PDatum() {
@@ -110,6 +112,28 @@ public class SchemaUtil {
     public static boolean isPKColumn(PColumn column) {
         return column.getFamilyName() == null;
     }
+  
+    /**
+     * Imperfect estimate of row size given a PTable
+     * TODO: keep row count in stats table and use total size / row count instead
+     * @param table
+     * @return estimate of size in bytes of a row
+     */
+    public static long estimateRowSize(PTable table) {
+    	int keyLength = estimateKeyLength(table);
+    	long rowSize = 0;
+    	for (PColumn column : table.getColumns()) {
+    		if (!SchemaUtil.isPKColumn(column)) {
+                PDataType type = column.getDataType();
+                Integer maxLength = column.getMaxLength();
+                int valueLength = !type.isFixedWidth() ? VAR_KV_LENGTH_ESTIMATE : maxLength == null ? type.getByteSize() : maxLength;
+    			rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, column.getFamilyName().getBytes().length, column.getName().getBytes().length, valueLength);
+    		}
+    	}
+    	// Empty key value
+    	rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, getEmptyColumnFamily(table).length, QueryConstants.EMPTY_COLUMN_BYTES.length, 0);
+    	return rowSize;
+    }
     
     /**
      * Estimate the max key length in bytes of the PK for a given table

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 3327dba..7205faa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
 
 
+@SuppressWarnings("deprecation")
 public class ServerUtil {
     private static final int COPROCESSOR_SCAN_WORKS = VersionUtil.encodeVersion("0.98.6");
     
@@ -133,19 +134,34 @@ public class ServerUtil {
         return null;
     }
 
-    @SuppressWarnings("deprecation")
-    public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, String tableName) throws IOException {
-        String versionString = env.getHBaseVersion();
-        int version = VersionUtil.encodeVersion(versionString);
-        if (version >= COPROCESSOR_SCAN_WORKS) {
-            // The following *should* work, but doesn't due to HBASE-11837 which was fixed in 0.98.6
-            return env.getTable(TableName.valueOf(tableName));
-        }
-        // This code works around HBASE-11837
+    private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment env) {
+        return (VersionUtil.encodeVersion(env.getHBaseVersion()) >= COPROCESSOR_SCAN_WORKS);
+    }
+    
+    /*
+     * This code works around HBASE-11837 which causes HTableInterfaces retrieved from
+     * RegionCoprocessorEnvironment to not read local data.
+     */
+    private static HTableInterface getTableFromSingletonPool(RegionCoprocessorEnvironment env, byte[] tableName) {
         // It's ok to not ever do a pool.close() as we're storing a single
         // table only. The HTablePool holds no other resources that this table
         // which will be closed itself when it's no longer needed.
+        @SuppressWarnings("resource")
         HTablePool pool = new HTablePool(env.getConfiguration(),1);
         return pool.getTable(tableName);
     }
+    
+    public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, HTableInterface writerTable) throws IOException {
+        if (coprocessorScanWorks(env)) {
+            return writerTable;
+        }
+        return getTableFromSingletonPool(env, writerTable.getTableName());
+    }
+    
+    public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, byte[] tableName) throws IOException {
+        if (coprocessorScanWorks(env)) {
+            return env.getTable(TableName.valueOf(tableName));
+        }
+        return getTableFromSingletonPool(env, tableName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
index 93b9e9e..e2f6fdb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.expression.function.SumAggregateFunction;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PLongColumn;
 import org.apache.phoenix.schema.PName;
@@ -143,6 +144,16 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest {
             @Override
             public void explain(List<String> planSteps) {
             }
+
+			@Override
+			public List<KeyRange> getSplits() {
+				return Collections.emptyList();
+			}
+
+			@Override
+			public List<List<Scan>> getScans() {
+				return Collections.emptyList();
+			}
             
         };
         ResultIterator scanner = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregationManager.getAggregators());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
index a135729..02fdcea 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ConcatResultIteratorTest.java
@@ -24,9 +24,10 @@ import java.sql.SQLException;
 import java.util.*;
 
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
-
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.AssertResults;
@@ -68,6 +69,15 @@ public class ConcatResultIteratorTest {
             public void explain(List<String> planSteps) {
             }
             
+			@Override
+			public List<KeyRange> getSplits() {
+				return Collections.emptyList();
+			}
+
+			@Override
+			public List<List<Scan>> getScans() {
+				return Collections.emptyList();
+			}
         };
 
         Tuple[] expectedResults = new Tuple[] {
@@ -118,6 +128,15 @@ public class ConcatResultIteratorTest {
             public void explain(List<String> planSteps) {
             }
             
+			@Override
+			public List<KeyRange> getSplits() {
+				return Collections.emptyList();
+			}
+
+			@Override
+			public List<List<Scan>> getScans() {
+				return Collections.emptyList();
+			}
         };
         ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
         AssertResults.assertResults(scanner, expectedResults);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
index 9ff088e..095027c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
@@ -24,9 +24,10 @@ import java.sql.SQLException;
 import java.util.*;
 
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
-
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.AssertResults;
@@ -73,6 +74,15 @@ public class MergeSortResultIteratorTest {
             public void explain(List<String> planSteps) {
             }
             
+			@Override
+			public List<KeyRange> getSplits() {
+				return Collections.emptyList();
+			}
+
+			@Override
+			public List<List<Scan>> getScans() {
+				return Collections.emptyList();
+			}
         };
         ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators);
         AssertResults.assertResults(scanner, expectedResults);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
index 1e3df0b..6139aa5 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryPlanTest.java
@@ -25,6 +25,7 @@ import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.Properties;
 
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.junit.Test;
 
@@ -183,5 +184,55 @@ public class QueryPlanTest extends BaseConnectionlessQueryTest {
             }
         }
     }
+    
+    @Test
+    public void testTenantSpecificConnWithLimit() throws Exception {
+        String baseTableDDL = "CREATE TABLE BASE_MULTI_TENANT_TABLE(\n " + 
+                "  tenant_id VARCHAR(5) NOT NULL,\n" + 
+                "  userid INTEGER NOT NULL,\n" + 
+                "  username VARCHAR NOT NULL,\n" +
+                "  col VARCHAR\n " + 
+                "  CONSTRAINT pk PRIMARY KEY (tenant_id, userid, username)) MULTI_TENANT=true";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(baseTableDDL);
+        conn.close();
+        
+        String tenantId = "tenantId";
+        String tenantViewDDL = "CREATE VIEW TENANT_VIEW AS SELECT * FROM BASE_MULTI_TENANT_TABLE";
+        Properties tenantProps = new Properties();
+        tenantProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        conn = DriverManager.getConnection(getUrl(), tenantProps);
+        conn.createStatement().execute(tenantViewDDL);
+        
+        String query = "EXPLAIN SELECT * FROM TENANT_VIEW LIMIT 1";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        assertEquals("CLIENT SERIAL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + 
+                "    SERVER FILTER BY PageFilter 1\n" + 
+                "    SERVER 1 ROW LIMIT\n" + 
+                "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+        query = "EXPLAIN SELECT * FROM TENANT_VIEW LIMIT " + Integer.MAX_VALUE;
+        rs = conn.createStatement().executeQuery(query);
+        assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + 
+                "    SERVER FILTER BY PageFilter " + Integer.MAX_VALUE + "\n" + 
+                "    SERVER " + Integer.MAX_VALUE + " ROW LIMIT\n" + 
+                "CLIENT " + Integer.MAX_VALUE + " ROW LIMIT", QueryUtil.getExplainPlan(rs));
+        query = "EXPLAIN SELECT * FROM TENANT_VIEW WHERE username = 'Joe' LIMIT 1";
+        rs = conn.createStatement().executeQuery(query);
+        assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + 
+                "    SERVER FILTER BY USERNAME = 'Joe'\n" + 
+                "    SERVER 1 ROW LIMIT\n" + 
+                "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+        query = "EXPLAIN SELECT * FROM TENANT_VIEW WHERE col = 'Joe' LIMIT 1";
+        rs = conn.createStatement().executeQuery(query);
+        assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER BASE_MULTI_TENANT_TABLE ['tenantId']\n" + 
+                "    SERVER FILTER BY COL = 'Joe'\n" + 
+                "    SERVER 1 ROW LIMIT\n" + 
+                "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+    }
+    
+    @Test
+    public void testLimitOnTenantSpecific() throws Exception {
+        
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
index 2bff620..f6808a8 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
@@ -105,7 +105,7 @@ public final class PhoenixRecordReader extends RecordReader<NullWritable,Phoenix
                 PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }
-            ResultIterator iterator = ConcatResultIterator.newConcatResultIterator(iterators);
+            ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
             if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
                 iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
             }