You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2016/06/23 20:54:36 UTC

phoenix git commit: PHOENIX-2949 Fix estimated region size when checking for serial query

Repository: phoenix
Updated Branches:
  refs/heads/master 8a72032e2 -> a44387358


PHOENIX-2949 Fix estimated region size when checking for serial query


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a4438735
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a4438735
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a4438735

Branch: refs/heads/master
Commit: a44387358d6b58b77358a42f38c5baac9e2ab527
Parents: 8a72032
Author: Ankit Singhal <an...@gmail.com>
Authored: Thu Jun 23 13:54:33 2016 -0700
Committer: Ankit Singhal <an...@gmail.com>
Committed: Thu Jun 23 13:54:33 2016 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/execute/ScanPlan.java    | 46 ++++++++++----------
 .../org/apache/phoenix/query/QueryServices.java |  2 +-
 2 files changed, 25 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a4438735/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index c55a1cc..0975b3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -25,7 +25,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -62,7 +62,6 @@ import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.PTableStats;
-import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
@@ -118,7 +117,7 @@ public class ScanPlan extends BaseQueryPlan {
         Scan scan = context.getScan();
         /*
          * If a limit is provided and we have no filter, run the scan serially when we estimate that
-         * the limit's worth of data will fit into a single region.
+         * the limit's worth of data is less than the threshold bytes provided in QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD
          */
         Integer perScanLimit = !allowPageFilter ? null : limit;
         if (perScanLimit == null || scan.getFilter() != null) {
@@ -127,32 +126,35 @@ public class ScanPlan extends BaseQueryPlan {
         long scn = context.getConnection().getSCN() == null ? Long.MAX_VALUE : context.getConnection().getSCN();
         PTableStats tableStats = context.getConnection().getQueryServices().getTableStats(table.getName().getBytes(), scn);
         GuidePostsInfo gpsInfo = tableStats.getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
-        long estRowSize = SchemaUtil.estimateRowSize(table);
-        long estRegionSize;
+        ConnectionQueryServices services = context.getConnection().getQueryServices();
+        long estRowSize;
+        long estimatedParallelThresholdBytes;
         if (gpsInfo == null) {
-            // Use guidepost depth as minimum size
-            ConnectionQueryServices services = context.getConnection().getQueryServices();
-            HTableDescriptor desc = services.getTableDescriptor(table.getPhysicalName().getBytes());
-            int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
-                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
-            long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
-            estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
+            estRowSize = SchemaUtil.estimateRowSize(table);
+            estimatedParallelThresholdBytes = services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE,
+                    HConstants.DEFAULT_MAX_FILE_SIZE);
         } else {
-            // Region size estimated based on total number of bytes divided by number of regions
             long totByteSize = 0;
+            long totRowCount = 0;
             for (long byteCount : gpsInfo.getByteCounts()) {
                 totByteSize += byteCount;
             }
-            estRegionSize = totByteSize / (gpsInfo.getGuidePostsCount()+1);
+            for (long rowCount : gpsInfo.getRowCounts()) {
+                totRowCount += rowCount;
+            }
+            estRowSize = totByteSize / totRowCount;
+            estimatedParallelThresholdBytes = 2
+                    * services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                            QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
         }
-        // TODO: configurable number of bytes?
-        boolean isSerial = (perScanLimit * estRowSize < estRegionSize);
-        
-        if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("With LIMIT=" + perScanLimit
-                + ", estimated row size=" + estRowSize
-                + ", estimated region size=" + estRegionSize + " (" + (gpsInfo == null ? "without " : "with ") + "stats)"
-                + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution", context.getConnection()));
+        long limitThreshold = services.getProps().getLong(QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD,
+                estimatedParallelThresholdBytes);
+        boolean isSerial = (perScanLimit * estRowSize < limitThreshold);
+
+        if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations(
+                "With LIMIT=" + perScanLimit + ", estimated row size=" + estRowSize + ", limitThreshold="
+                        + limitThreshold + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution",
+                context.getConnection()));
         return isSerial;
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a4438735/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index f5e2a0a..e255e61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -214,7 +214,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String HCONNECTION_POOL_CORE_SIZE = "hbase.hconnection.threads.core";
     public static final String HCONNECTION_POOL_MAX_SIZE = "hbase.hconnection.threads.max";
     public static final String HTABLE_MAX_THREADS = "hbase.htable.threads.max";
-
+    public static final String QUERY_PARALLEL_LIMIT_THRESHOLD = "phoenix.query.parallelThresholdBytes";
     // time to wait before running second index population upsert select (so that any pending batches of rows on region server are also written to index)
     public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time";
     public static final String LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB = "phoenix.client.localIndexUpgrade";