You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/04/26 16:52:26 UTC

[5/6] phoenix git commit: PHOENIX-4686 Phoenix stats does not account for server side limit push downs (Abhishek Chouhan)

PHOENIX-4686 Phoenix stats does not account for server side limit push downs (Abhishek Chouhan)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b173aaf7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b173aaf7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b173aaf7

Branch: refs/heads/4.x-HBase-1.3
Commit: b173aaf79b282c8be1564d2be9981e7c11d40686
Parents: 2e17712
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Apr 26 09:14:52 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Apr 26 09:51:01 2018 -0700

----------------------------------------------------------------------
 .../end2end/ExplainPlanWithStatsEnabledIT.java  | 49 ++++++++++++++++++-
 .../org/apache/phoenix/execute/ScanPlan.java    |  4 +-
 .../phoenix/iterate/BaseResultIterators.java    | 50 +++++++++++++++++---
 .../phoenix/iterate/ParallelIterators.java      |  8 ++++
 .../apache/phoenix/iterate/SerialIterators.java | 13 ++---
 .../phoenix/schema/stats/StatisticsUtil.java    |  6 +++
 6 files changed, 111 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b173aaf7/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
index 2099f4c..abaa2f6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EnvironmentEdge;
@@ -123,6 +124,50 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
     }
 
     @Test
+    public void testBytesRowsForPointSelectWithLimitGreaterThanPointLookupSize() throws Exception {
+        String sql = "SELECT * FROM " + tableA + " where k in (? ,?) limit 4";
+        List<Object> binds = Lists.newArrayList();
+        binds.add(103); binds.add(104);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            Estimate info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 200L, info.estimatedBytes);
+            assertEquals((Long) 2L, info.estimatedRows);
+            assertEquals((Long) StatisticsUtil.NOT_STATS_BASED_TS, info.estimateInfoTs);
+        }
+    }
+
+    @Test
+    public void testBytesRowsForSelectWithLimit() throws Exception {
+        String sql = "SELECT * FROM " + tableA + " where c1.a in (?,?) limit 3";
+        String noIndexSQL = "SELECT /*+ NO_INDEX */ * FROM " + tableA + " where c1.a in (?,?) limit 3";
+        List<Object> binds = Lists.newArrayList();
+        binds.add(1); binds.add(2);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            Estimate info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 264L, info.estimatedBytes);
+            assertEquals((Long) 3L, info.estimatedRows);
+            assertEquals((Long) StatisticsUtil.NOT_STATS_BASED_TS, info.estimateInfoTs);
+
+            info = getByteRowEstimates(conn, noIndexSQL, binds);
+            assertEquals((Long) 634L, info.estimatedBytes);
+            assertEquals((Long) 10L, info.estimatedRows);
+            assertTrue(info.estimateInfoTs > 0);
+        }
+    }
+
+    @Test
+    public void testBytesRowsForSelectWithLimitIgnored() throws Exception {
+        String sql = "SELECT * FROM " + tableA + " where (c1.a > c2.b) limit 1";
+        List<Object> binds = Lists.newArrayList();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            Estimate info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 691L, info.estimatedBytes);
+            assertEquals((Long) 10L, info.estimatedRows);
+            assertTrue(info.estimateInfoTs > 0);
+        }
+    }
+
+    @Test
     public void testBytesRowsForSelectWhenKeyInRange() throws Exception {
         String sql = "SELECT * FROM " + tableB + " where k >= ?";
         List<Object> binds = Lists.newArrayList();
@@ -278,7 +323,7 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
             Estimate info = getByteRowEstimates(conn, sql, binds);
             assertEquals((Long) 200L, info.estimatedBytes);
             assertEquals((Long) 2L, info.estimatedRows);
-            assertTrue(info.estimateInfoTs > 0);
+            assertEquals((Long) StatisticsUtil.NOT_STATS_BASED_TS, info.estimateInfoTs);
         }
     }
 
@@ -305,7 +350,7 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
             Estimate info = getByteRowEstimates(conn, sql, binds);
             assertEquals((Long) 176L, info.estimatedBytes);
             assertEquals((Long) 2L, info.estimatedRows);
-            assertTrue(info.estimateInfoTs > 0);
+            assertEquals((Long) StatisticsUtil.NOT_STATS_BASED_TS, info.estimateInfoTs);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b173aaf7/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index ed145a4..cdb2da5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -68,8 +68,8 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.CostUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -124,7 +124,7 @@ public class ScanPlan extends BaseQueryPlan {
         if (isSerial) {
             serialBytesEstimate = estimate.getFirst();
             serialRowsEstimate = estimate.getSecond();
-            serialEstimateInfoTs = EnvironmentEdgeManager.currentTimeMillis();
+            serialEstimateInfoTs = StatisticsUtil.NOT_STATS_BASED_TS;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b173aaf7/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 682d1ed..aa9a9f5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -76,6 +77,7 @@ import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.filter.BooleanExpressionFilter;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.filter.DistinctPrefixFilter;
 import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
@@ -170,6 +172,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return plan.getTableRef().getTable();
     }
     
+    abstract protected boolean isSerial();
+    
     protected boolean useStats() {
         /*
          * Don't use guide posts:
@@ -180,7 +184,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         if (ScanUtil.isAnalyzeTable(scan)) {
             return false;
         }
-        return true;
+        return !isSerial();
     }
     
     private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) throws SQLException {
@@ -1105,10 +1109,25 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 }
                 regionIndex++;
             }
-            if (scanRanges.isPointLookup()) {
-                this.estimatedRows = Long.valueOf(scanRanges.getPointLookupCount());
+            if (!scans.isEmpty()) { // Add any remaining scans
+                parallelScans.add(scans);
+            }
+            Long pageLimit = getUnfilteredPageLimit(scan);
+            if (scanRanges.isPointLookup() || pageLimit != null) {
+                // If run in parallel, the limit is pushed to each parallel scan so must be accounted for in all of them
+                int parallelFactor = this.isSerial() ? 1 : parallelScans.size();
+                if (scanRanges.isPointLookup() && pageLimit != null) {
+                    this.estimatedRows = Long.valueOf(Math.min(scanRanges.getPointLookupCount(), pageLimit * parallelFactor));
+                } else if (scanRanges.isPointLookup()) {
+                    this.estimatedRows = Long.valueOf(scanRanges.getPointLookupCount());
+                } else {
+                    this.estimatedRows = Long.valueOf(pageLimit) * parallelFactor;
+                }
                 this.estimatedSize = this.estimatedRows * SchemaUtil.estimateRowSize(table);
-                this.estimateInfoTimestamp = computeMinTimestamp(gpsAvailableForAllRegions, estimates, fallbackTs);
+                 // Indication to client that the statistics estimates were not
+                 // calculated based on statistics but instead are based on row
+                 // limits from the query.
+               this.estimateInfoTimestamp = StatisticsUtil.NOT_STATS_BASED_TS;
             } else if (emptyGuidePost) {
                 // In case of an empty guide post, we estimate the number of rows scanned by
                 // using the estimated row size
@@ -1124,9 +1143,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 this.estimatedSize = null;
                 this.estimateInfoTimestamp = null;
             }
-            if (!scans.isEmpty()) { // Add any remaining scans
-                parallelScans.add(scans);
-            }
         } finally {
             if (stream != null) Closeables.closeQuietly(stream);
         }
@@ -1134,6 +1150,26 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return parallelScans;
     }
 
+    /**
+     * Return row count limit of PageFilter if exists and there is no where
+     * clause filter.
+     * @return
+     */
+    private static Long getUnfilteredPageLimit(Scan scan) {
+        Long pageLimit = null;
+        Iterator<Filter> filters = ScanUtil.getFilterIterator(scan);
+        while (filters.hasNext()) {
+            Filter filter = filters.next();
+            if (filter instanceof BooleanExpressionFilter) {
+                return null;
+            }
+            if (filter instanceof PageFilter) {
+                pageLimit = ((PageFilter)filter).getPageSize();
+            }
+        }
+        return pageLimit;
+    }
+
     private static Long computeMinTimestamp(boolean gpsAvailableForAllRegions, 
             GuidePostEstimate estimates,
             long fallbackTs) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b173aaf7/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 3a4b084..41d278d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -69,6 +69,14 @@ public class ParallelIterators extends BaseResultIterators {
         this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches, dataPlan);
     }  
 
+    /**
+     * No need to use stats when executing serially
+     */
+    @Override
+    protected boolean isSerial() {
+        return false;
+    }
+    
     @Override
     protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
             final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b173aaf7/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index f94a7c9..c13fcdb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -73,6 +73,11 @@ public class SerialIterators extends BaseResultIterators {
     }
 
     @Override
+    protected boolean isSerial() {
+        return true;
+    }
+
+    @Override
     protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
             final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, final ParallelScanGrouper scanGrouper) {
         ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
@@ -117,14 +122,6 @@ public class SerialIterators extends BaseResultIterators {
         }
     }
 
-    /**
-     * No need to use stats when executing serially
-     */
-    @Override
-    protected boolean useStats() {
-        return false;
-    }
-    
     @Override
     protected String getName() {
         return NAME;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b173aaf7/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index 0b9c409..4a758b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -47,6 +47,12 @@ import com.google.common.collect.Sets;
  * Simple utility class for managing multiple key parts of the statistic
  */
 public class StatisticsUtil {
+    /**
+     * Indication to client that the statistics estimates were not
+     * calculated based on statistics but instead are based on row
+     * limits from the query.
+     */
+    public static final long NOT_STATS_BASED_TS = 0;
     
     private static final Set<TableName> DISABLE_STATS = Sets.newHashSetWithExpectedSize(8);
     // TODO: make this declarative through new DISABLE_STATS column on SYSTEM.CATALOG table.