You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/11/15 18:48:23 UTC

[28/37] phoenix git commit: PHOENIX-4287 Incorrect aggregate query results when stats are disable for parallelization

PHOENIX-4287 Incorrect aggregate query results when stats are disable for parallelization


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cba2b571
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cba2b571
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cba2b571

Branch: refs/heads/4.x-HBase-1.1
Commit: cba2b5719cb39f244f12b79f732233bb9ef6fb4c
Parents: e0df4b2
Author: Samarth Jain <sa...@apache.org>
Authored: Tue Oct 31 10:12:22 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 15 10:46:40 2017 -0800

----------------------------------------------------------------------
 .../end2end/ExplainPlanWithStatsEnabledIT.java  | 209 ++++++++++++++++++-
 .../phoenix/iterate/BaseResultIterators.java    |  55 +++--
 2 files changed, 246 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cba2b571/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
index 62538af..931c398 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_USE_STATS_FOR_PARALLELIZATION;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
@@ -387,11 +388,8 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
     @Test
     public void testBytesRowsForSelectOnTenantViews() throws Exception {
         String tenant1View = generateUniqueName();
-        ;
         String tenant2View = generateUniqueName();
-        ;
         String tenant3View = generateUniqueName();
-        ;
         String multiTenantBaseTable = generateUniqueName();
         String tenant1 = "tenant1";
         String tenant2 = "tenant2";
@@ -504,6 +502,211 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
         }
     }
 
+    @Test // See https://issues.apache.org/jira/browse/PHOENIX-4287
+    public void testEstimatesForAggregateQueries() throws Exception {
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            int guidePostWidth = 20;
+            String ddl =
+                    "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, a bigint, b bigint)"
+                            + " GUIDE_POSTS_WIDTH=" + guidePostWidth
+                            + ", USE_STATS_FOR_PARALLELIZATION=false";
+            byte[][] splits =
+                    new byte[][] { Bytes.toBytes(102), Bytes.toBytes(105), Bytes.toBytes(108) };
+            BaseTest.createTestTable(getUrl(), ddl, splits, null);
+            conn.createStatement().execute("upsert into " + tableName + " values (100,1,3)");
+            conn.createStatement().execute("upsert into " + tableName + " values (101,2,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (102,2,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (103,2,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (104,2,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (105,2,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (106,2,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (107,2,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (108,2,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (109,2,4)");
+            conn.commit();
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName + "");
+        }
+        List<Object> binds = Lists.newArrayList();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String sql = "SELECT COUNT(*) " + " FROM " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery(sql);
+            assertTrue(rs.next());
+            assertEquals(10, rs.getInt(1));
+            Estimate info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 10l, info.getEstimatedRows());
+            assertTrue(info.getEstimateInfoTs() > 0);
+
+            // Now let's make sure that when using stats for parallelization, our estimates
+            // and query results stay the same
+            conn.createStatement().execute(
+                "ALTER TABLE " + tableName + " SET USE_STATS_FOR_PARALLELIZATION=true");
+            rs = conn.createStatement().executeQuery(sql);
+            assertTrue(rs.next());
+            assertEquals(10, rs.getInt(1));
+            info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 10l, info.getEstimatedRows());
+            assertTrue(info.getEstimateInfoTs() > 0);
+        }
+    }
+
+    @Test
+    public void testSelectQueriesWithStatsForParallelizationOff() throws Exception {
+        testSelectQueriesWithFilters(false);
+    }
+
+    @Test
+    public void testSelectQueriesWithStatsForParallelizationOn() throws Exception {
+        testSelectQueriesWithFilters(true);
+    }
+
+    private void testSelectQueriesWithFilters(boolean useStatsForParallelization) throws Exception {
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            int guidePostWidth = 20;
+            String ddl =
+                    "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, a bigint, b bigint)"
+                            + " GUIDE_POSTS_WIDTH=" + guidePostWidth
+                            + ", USE_STATS_FOR_PARALLELIZATION=" + useStatsForParallelization;
+            byte[][] splits =
+                    new byte[][] { Bytes.toBytes(102), Bytes.toBytes(105), Bytes.toBytes(108) };
+            BaseTest.createTestTable(getUrl(), ddl, splits, null);
+            conn.createStatement().execute("upsert into " + tableName + " values (100,100,3)");
+            conn.createStatement().execute("upsert into " + tableName + " values (101,101,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (102,102,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (103,103,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (104,104,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (105,105,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (106,106,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (107,107,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (108,108,4)");
+            conn.createStatement().execute("upsert into " + tableName + " values (109,109,4)");
+            conn.commit();
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName + "");
+        }
+        List<Object> binds = Lists.newArrayList();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // query whose start key is before any data
+            String sql = "SELECT a FROM " + tableName + " WHERE K >= 99";
+            ResultSet rs = conn.createStatement().executeQuery(sql);
+            int i = 0;
+            int numRows = 10;
+            while (rs.next()) {
+                assertEquals(100 + i, rs.getInt(1));
+                i++;
+            }
+            assertEquals(numRows, i);
+            Estimate info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 10l, info.getEstimatedRows());
+            assertEquals((Long) 930l, info.getEstimatedBytes());
+            assertTrue(info.getEstimateInfoTs() > 0);
+
+            // query whose start key is after any data
+            sql = "SELECT a FROM " + tableName + " WHERE K >= 110";
+            rs = conn.createStatement().executeQuery(sql);
+            assertFalse(rs.next());
+            info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 0l, info.getEstimatedRows());
+            assertEquals((Long) 0l, info.getEstimatedBytes());
+            assertTrue(info.getEstimateInfoTs() > 0);
+
+            // Query whose end key is before any data
+            sql = "SELECT a FROM " + tableName + " WHERE K <= 98";
+            rs = conn.createStatement().executeQuery(sql);
+            assertFalse(rs.next());
+            info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 0l, info.getEstimatedRows());
+            assertEquals((Long) 0l, info.getEstimatedBytes());
+            assertTrue(info.getEstimateInfoTs() > 0);
+
+            // Query whose end key is after any data. In this case, we return the estimate as
+            // scanning all the guide posts.
+            sql = "SELECT a FROM " + tableName + " WHERE K <= 110";
+            rs = conn.createStatement().executeQuery(sql);
+            i = 0;
+            numRows = 10;
+            while (rs.next()) {
+                assertEquals(100 + i, rs.getInt(1));
+                i++;
+            }
+            assertEquals(numRows, i);
+            info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 10l, info.getEstimatedRows());
+            assertEquals((Long) 930l, info.getEstimatedBytes());
+            assertTrue(info.getEstimateInfoTs() > 0);
+
+            // Query whose start key and end key is before any data. In this case, we return the
+            // estimate as
+            // scanning the first guide post
+            sql = "SELECT a FROM " + tableName + " WHERE K <= 90 AND K >= 80";
+            rs = conn.createStatement().executeQuery(sql);
+            assertFalse(rs.next());
+            info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 0l, info.getEstimatedRows());
+            assertEquals((Long) 0l, info.getEstimatedBytes());
+            assertTrue(info.getEstimateInfoTs() > 0);
+
+            // Query whose start key and end key is after any data. In this case, we return the
+            // estimate as
+            // scanning no guide post
+            sql = "SELECT a FROM " + tableName + " WHERE K <= 130 AND K >= 120";
+            rs = conn.createStatement().executeQuery(sql);
+            assertFalse(rs.next());
+            info = getByteRowEstimates(conn, sql, binds);
+            assertEquals((Long) 0l, info.getEstimatedRows());
+            assertEquals((Long) 0l, info.getEstimatedBytes());
+            assertTrue(info.getEstimateInfoTs() > 0);
+
+            // Query whose start key is before and end key is between data. In this case, we return
+            // the estimate as
+            // scanning no guide post
+            sql = "SELECT a FROM " + tableName + " WHERE K <= 102 AND K >= 90";
+            rs = conn.createStatement().executeQuery(sql);
+            i = 0;
+            numRows = 3;
+            while (rs.next()) {
+                assertEquals(100 + i, rs.getInt(1));
+                i++;
+            }
+            info = getByteRowEstimates(conn, sql, binds);
+            // Depending on the guidepost boundary, this estimate
+            // can be slightly off. It's called estimate for a reason.
+            assertEquals((Long) 4l, info.getEstimatedRows());
+            assertEquals((Long) 330l, info.getEstimatedBytes());
+            assertTrue(info.getEstimateInfoTs() > 0);
+            // Query whose start key is between and end key is after data.
+            sql = "SELECT a FROM " + tableName + " WHERE K <= 120 AND K >= 100";
+            rs = conn.createStatement().executeQuery(sql);
+            i = 0;
+            numRows = 10;
+            while (rs.next()) {
+                assertEquals(100 + i, rs.getInt(1));
+                i++;
+            }
+            info = getByteRowEstimates(conn, sql, binds);
+            // Depending on the guidepost boundary, this estimate
+            // can be slightly off. It's called estimate for a reason.
+            assertEquals((Long) 9l, info.getEstimatedRows());
+            assertEquals((Long) 900l, info.getEstimatedBytes());
+            assertTrue(info.getEstimateInfoTs() > 0);
+            // Query whose start key and end key are both between data.
+            sql = "SELECT a FROM " + tableName + " WHERE K <= 109 AND K >= 100";
+            rs = conn.createStatement().executeQuery(sql);
+            i = 0;
+            numRows = 10;
+            while (rs.next()) {
+                assertEquals(100 + i, rs.getInt(1));
+                i++;
+            }
+            info = getByteRowEstimates(conn, sql, binds);
+            // Depending on the guidepost boundary, this estimate
+            // can be slightly off. It's called estimate for a reason.
+            assertEquals((Long) 9l, info.getEstimatedRows());
+            assertEquals((Long) 900l, info.getEstimatedBytes());
+            assertTrue(info.getEstimateInfoTs() > 0);
+        }
+    }
+
     private static void createMultitenantTableAndViews(String tenant1View, String tenant2View,
             String tenant3View, String tenant1, String tenant2, String tenant3,
             String multiTenantTable, MyClock clock) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cba2b571/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 250cb48..e9deec3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -35,6 +35,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.EOFException;
+import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -585,15 +586,29 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return context.getConnection().getQueryServices().getTableStats(key);
     }
 
-    private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation) {
+    private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan,
+            byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation,
+            GuidePostEstimate estimate, Long gpsRows, Long gpsBytes) {
         boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary);
         if (scan != null) {
             if (regionLocation.getServerName() != null) {
                 scan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER, regionLocation.getServerName().getVersionedBytes());
             }
-        	scans.add(scan);
+            if (useStatsForParallelization || crossedRegionBoundary) {
+                scans.add(scan);
+            }
+            if (estimate != null && gpsRows != null) {
+                estimate.rowsEstimate += gpsRows;
+            }
+            if (estimate != null && gpsBytes != null) {
+                estimate.bytesEstimate += gpsBytes;
+            }
         }
-        if (startNewScan && !scans.isEmpty()) {
+        if (startNewScan && !scans.isEmpty() && useStatsForParallelization) {
+            /*
+             * Note that even if region boundary was crossed, if we are not using stats for
+             * parallelization, nothing gets added to the parallel scans.
+             */
             parallelScans.add(scans);
             scans = Lists.newArrayListWithExpectedSize(1);
         }
@@ -653,7 +668,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     newScan.setStopRow(regionInfo.getEndKey());
                 }
             }
-            scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
+            scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation, null, null, null);
             regionIndex++;
         }
         if (!scans.isEmpty()) { // Add any remaining scans
@@ -662,6 +677,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return parallelScans;
     }
 
+    private static class GuidePostEstimate {
+        private long bytesEstimate;
+        private long rowsEstimate;
+    }
+
     /**
      * Compute the list of parallel scans to run for a given query. The inner scans
      * may be concatenated together directly, while the other ones may need to be
@@ -721,8 +741,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         DataInput input = null;
         PrefixByteDecoder decoder = null;
         int guideIndex = 0;
-        long estimatedRows = 0;
-        long estimatedSize = 0;
+        GuidePostEstimate estimates = new GuidePostEstimate();
         long estimateTs = Long.MAX_VALUE;
         long minGuidePostTimestamp = Long.MAX_VALUE;
         try {
@@ -763,6 +782,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     endRegionKey = regionInfo.getEndKey();
                     keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
                 }
+                byte[] initialKeyBytes = currentKeyBytes;
                 while (intersectWithGuidePosts && (endKey.length == 0 || currentGuidePost.compareTo(endKey) <= 0)) {
                     Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
                         false);
@@ -770,12 +790,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                         ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
                             regionInfo.getStartKey(), regionInfo.getEndKey(),
                             newScan.getStartRow(), newScan.getStopRow());
-                        estimatedRows += gps.getRowCounts()[guideIndex];
-                        estimatedSize += gps.getByteCounts()[guideIndex];
-                    }
-                    if (useStatsForParallelization) {
-                        scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation);
                     }
+                    scans =
+                            addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false,
+                                regionLocation, estimates, gps.getRowCounts()[guideIndex],
+                                gps.getByteCounts()[guideIndex]);
                     currentKeyBytes = currentGuidePostBytes;
                     try {
                         currentGuidePost = PrefixByteCodec.decode(decoder, input);
@@ -794,12 +813,19 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                         intersectWithGuidePosts = false;
                     }
                 }
+                if (!useStatsForParallelization) {
+                    /*
+                     * If we are not using stats for generating parallel scans, we need to reset the
+                     * currentKey back to what it was at the beginning of the loop.
+                     */
+                    currentKeyBytes = initialKeyBytes;
+                }
                 Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true);
                 if(newScan != null) {
                     ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
                         regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
                 }
-                scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
+                scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation, null, null, null);
                 currentKeyBytes = endKey;
                 regionIndex++;
             }
@@ -814,8 +840,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 this.estimatedSize = gps.getByteCounts()[0];
                 this.estimateInfoTimestamp = gps.getGuidePostTimestamps()[0];
             } else if (hasGuidePosts) {
-                this.estimatedRows = estimatedRows;
-                this.estimatedSize = estimatedSize;
+                this.estimatedRows = estimates.rowsEstimate;
+                this.estimatedSize = estimates.bytesEstimate;
                 this.estimateInfoTimestamp = estimateTs;
             } else {
                 this.estimatedRows = null;
@@ -828,7 +854,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         } finally {
             if (stream != null) Closeables.closeQuietly(stream);
         }
-        
         sampleScans(parallelScans,this.plan.getStatement().getTableSamplingRate());
         return parallelScans;
     }