You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/10/31 17:12:37 UTC
phoenix git commit: PHOENIX-4287 Incorrect aggregate query results
when stats are disable for parallelization
Repository: phoenix
Updated Branches:
refs/heads/master 54a8f2730 -> 97fe4f8aa
PHOENIX-4287 Incorrect aggregate query results when stats are disable for parallelization
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/97fe4f8a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/97fe4f8a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/97fe4f8a
Branch: refs/heads/master
Commit: 97fe4f8aa24d2b0cdf9d1418252b4d69cfb6e7a1
Parents: 54a8f27
Author: Samarth Jain <sa...@apache.org>
Authored: Tue Oct 31 10:12:22 2017 -0700
Committer: Samarth Jain <sa...@apache.org>
Committed: Tue Oct 31 10:12:32 2017 -0700
----------------------------------------------------------------------
.../end2end/ExplainPlanWithStatsEnabledIT.java | 209 ++++++++++++++++++-
.../phoenix/iterate/BaseResultIterators.java | 55 +++--
2 files changed, 246 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/97fe4f8a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
index 62538af..931c398 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_USE_STATS_FOR_PARALLELIZATION;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
@@ -387,11 +388,8 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
@Test
public void testBytesRowsForSelectOnTenantViews() throws Exception {
String tenant1View = generateUniqueName();
- ;
String tenant2View = generateUniqueName();
- ;
String tenant3View = generateUniqueName();
- ;
String multiTenantBaseTable = generateUniqueName();
String tenant1 = "tenant1";
String tenant2 = "tenant2";
@@ -504,6 +502,211 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
}
}
+ @Test // See https://issues.apache.org/jira/browse/PHOENIX-4287
+ public void testEstimatesForAggregateQueries() throws Exception {
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ int guidePostWidth = 20;
+ String ddl =
+ "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, a bigint, b bigint)"
+ + " GUIDE_POSTS_WIDTH=" + guidePostWidth
+ + ", USE_STATS_FOR_PARALLELIZATION=false";
+ byte[][] splits =
+ new byte[][] { Bytes.toBytes(102), Bytes.toBytes(105), Bytes.toBytes(108) };
+ BaseTest.createTestTable(getUrl(), ddl, splits, null);
+ conn.createStatement().execute("upsert into " + tableName + " values (100,1,3)");
+ conn.createStatement().execute("upsert into " + tableName + " values (101,2,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (102,2,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (103,2,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (104,2,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (105,2,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (106,2,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (107,2,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (108,2,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (109,2,4)");
+ conn.commit();
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName + "");
+ }
+ List<Object> binds = Lists.newArrayList();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String sql = "SELECT COUNT(*) " + " FROM " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(sql);
+ assertTrue(rs.next());
+ assertEquals(10, rs.getInt(1));
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 10l, info.getEstimatedRows());
+ assertTrue(info.getEstimateInfoTs() > 0);
+
+ // Now let's make sure that when using stats for parallelization, our estimates
+ // and query results stay the same
+ conn.createStatement().execute(
+ "ALTER TABLE " + tableName + " SET USE_STATS_FOR_PARALLELIZATION=true");
+ rs = conn.createStatement().executeQuery(sql);
+ assertTrue(rs.next());
+ assertEquals(10, rs.getInt(1));
+ info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 10l, info.getEstimatedRows());
+ assertTrue(info.getEstimateInfoTs() > 0);
+ }
+ }
+
+ @Test
+ public void testSelectQueriesWithStatsForParallelizationOff() throws Exception {
+ testSelectQueriesWithFilters(false);
+ }
+
+ @Test
+ public void testSelectQueriesWithStatsForParallelizationOn() throws Exception {
+ testSelectQueriesWithFilters(true);
+ }
+
+ private void testSelectQueriesWithFilters(boolean useStatsForParallelization) throws Exception {
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ int guidePostWidth = 20;
+ String ddl =
+ "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, a bigint, b bigint)"
+ + " GUIDE_POSTS_WIDTH=" + guidePostWidth
+ + ", USE_STATS_FOR_PARALLELIZATION=" + useStatsForParallelization;
+ byte[][] splits =
+ new byte[][] { Bytes.toBytes(102), Bytes.toBytes(105), Bytes.toBytes(108) };
+ BaseTest.createTestTable(getUrl(), ddl, splits, null);
+ conn.createStatement().execute("upsert into " + tableName + " values (100,100,3)");
+ conn.createStatement().execute("upsert into " + tableName + " values (101,101,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (102,102,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (103,103,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (104,104,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (105,105,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (106,106,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (107,107,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (108,108,4)");
+ conn.createStatement().execute("upsert into " + tableName + " values (109,109,4)");
+ conn.commit();
+ conn.createStatement().execute("UPDATE STATISTICS " + tableName + "");
+ }
+ List<Object> binds = Lists.newArrayList();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // query whose start key is before any data
+ String sql = "SELECT a FROM " + tableName + " WHERE K >= 99";
+ ResultSet rs = conn.createStatement().executeQuery(sql);
+ int i = 0;
+ int numRows = 10;
+ while (rs.next()) {
+ assertEquals(100 + i, rs.getInt(1));
+ i++;
+ }
+ assertEquals(numRows, i);
+ Estimate info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 10l, info.getEstimatedRows());
+ assertEquals((Long) 930l, info.getEstimatedBytes());
+ assertTrue(info.getEstimateInfoTs() > 0);
+
+ // query whose start key is after any data
+ sql = "SELECT a FROM " + tableName + " WHERE K >= 110";
+ rs = conn.createStatement().executeQuery(sql);
+ assertFalse(rs.next());
+ info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 0l, info.getEstimatedRows());
+ assertEquals((Long) 0l, info.getEstimatedBytes());
+ assertTrue(info.getEstimateInfoTs() > 0);
+
+ // Query whose end key is before any data
+ sql = "SELECT a FROM " + tableName + " WHERE K <= 98";
+ rs = conn.createStatement().executeQuery(sql);
+ assertFalse(rs.next());
+ info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 0l, info.getEstimatedRows());
+ assertEquals((Long) 0l, info.getEstimatedBytes());
+ assertTrue(info.getEstimateInfoTs() > 0);
+
+ // Query whose end key is after any data. In this case, we return the estimate as
+ // scanning all the guide posts.
+ sql = "SELECT a FROM " + tableName + " WHERE K <= 110";
+ rs = conn.createStatement().executeQuery(sql);
+ i = 0;
+ numRows = 10;
+ while (rs.next()) {
+ assertEquals(100 + i, rs.getInt(1));
+ i++;
+ }
+ assertEquals(numRows, i);
+ info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 10l, info.getEstimatedRows());
+ assertEquals((Long) 930l, info.getEstimatedBytes());
+ assertTrue(info.getEstimateInfoTs() > 0);
+
+ // Query whose start key and end key is before any data. In this case, we return the
+ // estimate as
+ // scanning the first guide post
+ sql = "SELECT a FROM " + tableName + " WHERE K <= 90 AND K >= 80";
+ rs = conn.createStatement().executeQuery(sql);
+ assertFalse(rs.next());
+ info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 0l, info.getEstimatedRows());
+ assertEquals((Long) 0l, info.getEstimatedBytes());
+ assertTrue(info.getEstimateInfoTs() > 0);
+
+ // Query whose start key and end key is after any data. In this case, we return the
+ // estimate as
+ // scanning no guide post
+ sql = "SELECT a FROM " + tableName + " WHERE K <= 130 AND K >= 120";
+ rs = conn.createStatement().executeQuery(sql);
+ assertFalse(rs.next());
+ info = getByteRowEstimates(conn, sql, binds);
+ assertEquals((Long) 0l, info.getEstimatedRows());
+ assertEquals((Long) 0l, info.getEstimatedBytes());
+ assertTrue(info.getEstimateInfoTs() > 0);
+
+ // Query whose start key is before and end key is between data. In this case, we return
+ // the estimate as
+ // scanning no guide post
+ sql = "SELECT a FROM " + tableName + " WHERE K <= 102 AND K >= 90";
+ rs = conn.createStatement().executeQuery(sql);
+ i = 0;
+ numRows = 3;
+ while (rs.next()) {
+ assertEquals(100 + i, rs.getInt(1));
+ i++;
+ }
+ info = getByteRowEstimates(conn, sql, binds);
+ // Depending on the guidepost boundary, this estimate
+ // can be slightly off. It's called estimate for a reason.
+ assertEquals((Long) 4l, info.getEstimatedRows());
+ assertEquals((Long) 330l, info.getEstimatedBytes());
+ assertTrue(info.getEstimateInfoTs() > 0);
+ // Query whose start key is between and end key is after data.
+ sql = "SELECT a FROM " + tableName + " WHERE K <= 120 AND K >= 100";
+ rs = conn.createStatement().executeQuery(sql);
+ i = 0;
+ numRows = 10;
+ while (rs.next()) {
+ assertEquals(100 + i, rs.getInt(1));
+ i++;
+ }
+ info = getByteRowEstimates(conn, sql, binds);
+ // Depending on the guidepost boundary, this estimate
+ // can be slightly off. It's called estimate for a reason.
+ assertEquals((Long) 9l, info.getEstimatedRows());
+ assertEquals((Long) 900l, info.getEstimatedBytes());
+ assertTrue(info.getEstimateInfoTs() > 0);
+ // Query whose start key and end key are both between data.
+ sql = "SELECT a FROM " + tableName + " WHERE K <= 109 AND K >= 100";
+ rs = conn.createStatement().executeQuery(sql);
+ i = 0;
+ numRows = 10;
+ while (rs.next()) {
+ assertEquals(100 + i, rs.getInt(1));
+ i++;
+ }
+ info = getByteRowEstimates(conn, sql, binds);
+ // Depending on the guidepost boundary, this estimate
+ // can be slightly off. It's called estimate for a reason.
+ assertEquals((Long) 9l, info.getEstimatedRows());
+ assertEquals((Long) 900l, info.getEstimatedBytes());
+ assertTrue(info.getEstimateInfoTs() > 0);
+ }
+ }
+
private static void createMultitenantTableAndViews(String tenant1View, String tenant2View,
String tenant3View, String tenant1, String tenant2, String tenant3,
String multiTenantTable, MyClock clock) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/97fe4f8a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 250cb48..e9deec3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -35,6 +35,7 @@ import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.EOFException;
+import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.BitSet;
@@ -585,15 +586,29 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return context.getConnection().getQueryServices().getTableStats(key);
}
- private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation) {
+ private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan,
+ byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation,
+ GuidePostEstimate estimate, Long gpsRows, Long gpsBytes) {
boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary);
if (scan != null) {
if (regionLocation.getServerName() != null) {
scan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER, regionLocation.getServerName().getVersionedBytes());
}
- scans.add(scan);
+ if (useStatsForParallelization || crossedRegionBoundary) {
+ scans.add(scan);
+ }
+ if (estimate != null && gpsRows != null) {
+ estimate.rowsEstimate += gpsRows;
+ }
+ if (estimate != null && gpsBytes != null) {
+ estimate.bytesEstimate += gpsBytes;
+ }
}
- if (startNewScan && !scans.isEmpty()) {
+ if (startNewScan && !scans.isEmpty() && useStatsForParallelization) {
+ /*
+ * Note that even if region boundary was crossed, if we are not using stats for
+ * parallelization, nothing gets added to the parallel scans.
+ */
parallelScans.add(scans);
scans = Lists.newArrayListWithExpectedSize(1);
}
@@ -653,7 +668,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
newScan.setStopRow(regionInfo.getEndKey());
}
}
- scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
+ scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation, null, null, null);
regionIndex++;
}
if (!scans.isEmpty()) { // Add any remaining scans
@@ -662,6 +677,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return parallelScans;
}
+ private static class GuidePostEstimate {
+ private long bytesEstimate;
+ private long rowsEstimate;
+ }
+
/**
* Compute the list of parallel scans to run for a given query. The inner scans
* may be concatenated together directly, while the other ones may need to be
@@ -721,8 +741,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
DataInput input = null;
PrefixByteDecoder decoder = null;
int guideIndex = 0;
- long estimatedRows = 0;
- long estimatedSize = 0;
+ GuidePostEstimate estimates = new GuidePostEstimate();
long estimateTs = Long.MAX_VALUE;
long minGuidePostTimestamp = Long.MAX_VALUE;
try {
@@ -763,6 +782,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
endRegionKey = regionInfo.getEndKey();
keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
}
+ byte[] initialKeyBytes = currentKeyBytes;
while (intersectWithGuidePosts && (endKey.length == 0 || currentGuidePost.compareTo(endKey) <= 0)) {
Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
false);
@@ -770,12 +790,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
regionInfo.getStartKey(), regionInfo.getEndKey(),
newScan.getStartRow(), newScan.getStopRow());
- estimatedRows += gps.getRowCounts()[guideIndex];
- estimatedSize += gps.getByteCounts()[guideIndex];
- }
- if (useStatsForParallelization) {
- scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation);
}
+ scans =
+ addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false,
+ regionLocation, estimates, gps.getRowCounts()[guideIndex],
+ gps.getByteCounts()[guideIndex]);
currentKeyBytes = currentGuidePostBytes;
try {
currentGuidePost = PrefixByteCodec.decode(decoder, input);
@@ -794,12 +813,19 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
intersectWithGuidePosts = false;
}
}
+ if (!useStatsForParallelization) {
+ /*
+ * If we are not using stats for generating parallel scans, we need to reset the
+ * currentKey back to what it was at the beginning of the loop.
+ */
+ currentKeyBytes = initialKeyBytes;
+ }
Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true);
if(newScan != null) {
ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
}
- scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
+ scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation, null, null, null);
currentKeyBytes = endKey;
regionIndex++;
}
@@ -814,8 +840,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
this.estimatedSize = gps.getByteCounts()[0];
this.estimateInfoTimestamp = gps.getGuidePostTimestamps()[0];
} else if (hasGuidePosts) {
- this.estimatedRows = estimatedRows;
- this.estimatedSize = estimatedSize;
+ this.estimatedRows = estimates.rowsEstimate;
+ this.estimatedSize = estimates.bytesEstimate;
this.estimateInfoTimestamp = estimateTs;
} else {
this.estimatedRows = null;
@@ -828,7 +854,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
} finally {
if (stream != null) Closeables.closeQuietly(stream);
}
-
sampleScans(parallelScans,this.plan.getStatement().getTableSamplingRate());
return parallelScans;
}