You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/25 21:39:15 UTC

[12/12] phoenix git commit: Temporary fix for PHOENIX-2714

Temporary fix for PHOENIX-2714


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/36ee23ab
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/36ee23ab
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/36ee23ab

Branch: refs/heads/calcite
Commit: 36ee23ab7760616394f32f88b2da6b038cfee1e9
Parents: 06fd581
Author: maryannxue <ma...@gmail.com>
Authored: Thu Feb 25 15:38:55 2016 -0500
Committer: maryannxue <ma...@gmail.com>
Committed: Thu Feb 25 15:38:55 2016 -0500

----------------------------------------------------------------------
 .../phoenix/calcite/rel/PhoenixTableScan.java   |   4 +-
 .../apache/phoenix/execute/AggregatePlan.java   |   2 -
 .../apache/phoenix/execute/BaseQueryPlan.java   |  11 --
 .../org/apache/phoenix/execute/ScanPlan.java    |   2 -
 .../phoenix/iterate/BaseResultIterators.java    | 154 ++++++++++++++++---
 5 files changed, 137 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index 80c328f..dfdb507 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.execute.ScanPlan;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.iterate.BaseResultIterators;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.SelectStatement;
@@ -131,8 +132,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
                 filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr);
                 WhereCompiler.setScanFilter(context, select, filterExpr, true, false);
                 scanRanges = context.getScanRanges();
-                ScanPlan plan = new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, true, null);
-                estimatedSize = plan.getEstimatedBytes();
+                estimatedSize = BaseResultIterators.getEstimatedCount(context, tableRef.getTable()).getSecond();
             } catch (SQLException e) {
                 throw new RuntimeException(e);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index f9eca0c..794247b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -208,8 +208,6 @@ public class AggregatePlan extends BaseQueryPlan {
 
         splits = iterators.getSplits();
         scans = iterators.getScans();
-        estimatedSize = iterators.getEstimatedByteCount();
-        estimatedRows = iterators.getEstimatedRowCount();
 
         AggregatingResultIterator aggResultIterator;
         // No need to merge sort for ungrouped aggregation

http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 0ee70ba..f389fd0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -108,8 +108,6 @@ public abstract class BaseQueryPlan implements QueryPlan {
      * immediately before creating the ResultIterator.
      */
     protected final Expression dynamicFilter;
-    protected Long estimatedRows;
-    protected Long estimatedSize;
     
 
     protected BaseQueryPlan(
@@ -129,16 +127,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         this.parallelIteratorFactory = parallelIteratorFactory;
         this.dynamicFilter = dynamicFilter;
     }
-
-    public Long getEstimatedRowCount() {
-        return this.estimatedRows;
-    }
     
-    public Long getEstimatedByteCount() {
-        return this.estimatedSize;
-    }
-    
-
 	@Override
 	public Operation getOperation() {
 		return Operation.QUERY;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 72d95be..23bf435 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -208,8 +208,6 @@ public class ScanPlan extends BaseQueryPlan {
         }
         splits = iterators.getSplits();
         scans = iterators.getScans();
-        estimatedSize = iterators.getEstimatedByteCount();
-        estimatedRows = iterators.getEstimatedRowCount();
         
         if (isOrdered) {
             scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index a48de13..22c0197 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -113,7 +113,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
 
     private final List<List<Scan>> scans;
     private final List<KeyRange> splits;
-    private final PTableStats tableStats;
     private final byte[] physicalTableName;
     private final QueryPlan plan;
     protected final String scanId;
@@ -136,7 +135,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return plan.getTableRef().getTable();
     }
     
-    private boolean useStats() {
+    private static boolean useStats(StatementContext context) {
         Scan scan = context.getScan();
         boolean isPointLookup = context.getScanRanges().isPointLookup();
         /*
@@ -337,7 +336,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
         physicalTableName = table.getPhysicalName().getBytes();
-        tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
         // Used to tie all the scans together during logging
         scanId = UUID.randomUUID().toString();
         
@@ -397,18 +395,18 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return guideIndex;
     }
 
-    private GuidePostsInfo getGuidePosts(Set<byte[]> whereConditions) {
+    private static GuidePostsInfo getGuidePosts(StatementContext context, PTable table, Set<byte[]> whereConditions) throws SQLException {
         /*
          * Don't use guide posts if: 1) We're doing a point lookup, as HBase is fast enough at those to not need them to
          * be further parallelized. TODO: pref test to verify 2) We're collecting stats, as in this case we need to scan
          * entire regions worth of data to track where to put the guide posts.
          */
-        if (!useStats()) { return GuidePostsInfo.NO_GUIDEPOST; }
+        if (!useStats(context)) { return GuidePostsInfo.NO_GUIDEPOST; }
 
         GuidePostsInfo gps = null;
-        PTable table = getTable();
+        PTableStats tableStats = new MetaDataClient(context.getConnection()).getTableStats(table);
         Map<byte[], GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
-        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
+        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
         if (table.getColumnFamilies().isEmpty()) {
             // For sure we can get the defaultCF from the table
             gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF);
@@ -430,7 +428,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return gps;
     }
 
-    private GuidePostsInfo getDefaultFamilyGuidePosts(Map<byte[], GuidePostsInfo> guidePostMap, byte[] defaultCF) {
+    private static GuidePostsInfo getDefaultFamilyGuidePosts(Map<byte[], GuidePostsInfo> guidePostMap, byte[] defaultCF) {
         if (guidePostMap.get(defaultCF) != null) {
                 return guidePostMap.get(defaultCF);
         }
@@ -491,7 +489,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 whereConditions.add(cf);
             }
         }
-        GuidePostsInfo gps = getGuidePosts(whereConditions);
+        GuidePostsInfo gps = getGuidePosts(context, table, whereConditions);
         hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
         boolean traverseAllRegions = isSalted || isLocalIndex;
         if (!traverseAllRegions) {
@@ -566,8 +564,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
                         Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
                                 false);
-                        estimatedRows += gps.getRowCounts().get(guideIndex);
-                        estimatedSize += gps.getByteCounts().get(guideIndex);
+                        if (newScan != null) {
+                            estimatedRows += gps.getRowCounts().get(guideIndex);
+                            estimatedSize += gps.getByteCounts().get(guideIndex);
+                        }
                         scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation);
                         currentKeyBytes = currentGuidePost.copyBytes();
                         currentGuidePost = PrefixByteCodec.decode(decoder, input);
@@ -603,6 +603,130 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return parallelScans;
     }
 
+
+    /**
+     * Compute the estimated count of rows and bytes that will be scanned.
+     * @return the estimated row count and the byte count.
+     * @throws SQLException
+     */
+    public static Pair<Long, Long> getEstimatedCount(StatementContext context, PTable table) throws SQLException {
+        if (table.getName() == null) { // empty table
+            return new Pair<Long, Long>(null, null);
+        }
+        
+        if (context.getScanRanges().isPointLookup()) {
+            return new Pair<Long, Long>(1L, SchemaUtil.estimateRowSize(table));
+        }
+        
+        TreeSet<byte[]> whereConditions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+        for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
+            byte[] cf = where.getFirst();
+            if (cf != null) {
+                whereConditions.add(cf);
+            }
+        }
+        GuidePostsInfo gps = getGuidePosts(context, table, whereConditions);
+        if (gps == GuidePostsInfo.NO_GUIDEPOST) {
+            return new Pair<Long, Long>(null, null);
+        }
+        
+        byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
+        Scan scan = context.getScan();
+        List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+                .getAllTableRegions(table.getPhysicalName().getBytes());
+        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+        ScanRanges scanRanges = context.getScanRanges();
+        boolean isSalted = table.getBucketNum() != null;
+        boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
+        boolean traverseAllRegions = isSalted || isLocalIndex;
+        if (!traverseAllRegions) {
+            byte[] scanStartRow = scan.getStartRow();
+            if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
+                startKey = scanStartRow;
+            }
+            byte[] scanStopRow = scan.getStopRow();
+            if (stopKey.length == 0
+                    || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) {
+                stopKey = scanStopRow;
+            }
+        }
+        
+        int regionIndex = 0;
+        int stopIndex = regionBoundaries.size();
+        if (startKey.length > 0) {
+            regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+        }
+        if (stopKey.length > 0) {
+            stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
+            if (isLocalIndex) {
+                stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
+            }
+        }
+        
+        ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey);
+        
+        int gpsSize = gps.getGuidePostsCount();
+        int keyOffset = 0;
+        ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
+        ImmutableBytesWritable guidePosts = gps.getGuidePosts();
+        ByteArrayInputStream stream = null;
+        DataInput input = null;
+        PrefixByteDecoder decoder = null;
+        int guideIndex = 0;
+        long estimatedRows = 0;
+        long estimatedSize = 0;
+        try {
+            if (gpsSize > 0) {
+                stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(), guidePosts.getLength());
+                input = new DataInputStream(stream);
+                decoder = new PrefixByteDecoder(gps.getMaxLength());
+                try {
+                    while (currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input)) >= 0
+                            && currentKey.getLength() != 0) {
+                        guideIndex++;
+                    }
+                } catch (EOFException e) {}
+            }
+            byte[] currentKeyBytes = currentKey.copyBytes();
+    
+            // Merge bisect with guideposts for all but the last region
+            while (regionIndex <= stopIndex) {
+                byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
+                byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
+                if (regionIndex == stopIndex) {
+                    endKey = stopKey;
+                } else {
+                    endKey = regionBoundaries.get(regionIndex);
+                }
+                HRegionLocation regionLocation = regionLocations.get(regionIndex);
+                if (isLocalIndex) {
+                    HRegionInfo regionInfo = regionLocation.getRegionInfo();
+                    endRegionKey = regionInfo.getEndKey();
+                    keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
+                }
+                try {
+                    while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
+                        Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
+                                false);
+                        if (newScan != null) {
+                            estimatedRows += gps.getRowCounts().get(guideIndex);
+                            estimatedSize += gps.getByteCounts().get(guideIndex);
+                        }
+                        currentKeyBytes = currentGuidePost.copyBytes();
+                        currentGuidePost = PrefixByteCodec.decode(decoder, input);
+                        currentGuidePostBytes = currentGuidePost.copyBytes();
+                        guideIndex++;
+                    }
+                } catch (EOFException e) {}
+                currentKeyBytes = endKey;
+                regionIndex++;
+            }
+        } finally {
+            if (stream != null) Closeables.closeQuietly(stream);
+        }
+        return new Pair<Long, Long>(estimatedRows, estimatedSize);
+    }
    
     public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
         if (!reverse) {
@@ -884,14 +1008,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         buf.append(getName()).append(" ").append(size()).append("-WAY ");
         explain(buf.toString(),planSteps);
     }
-
-    public Long getEstimatedRowCount() {
-        return this.estimatedRows;
-    }
-    
-    public Long getEstimatedByteCount() {
-        return this.estimatedSize;
-    }
     
     @Override
     public String toString() {