You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/06/27 16:21:04 UTC
phoenix git commit: PHOENIX-3023 Slow performance when limit queries
are executed in parallel by default
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 8cec07dca -> e0bf41f98
PHOENIX-3023 Slow performance when limit queries are executed in parallel by default
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e0bf41f9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e0bf41f9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e0bf41f9
Branch: refs/heads/4.x-HBase-0.98
Commit: e0bf41f98bfb72c094237fdc3465523e04c1c0ef
Parents: 8cec07d
Author: Samarth <sa...@salesforce.com>
Authored: Mon Jun 27 09:20:51 2016 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Mon Jun 27 09:20:51 2016 -0700
----------------------------------------------------------------------
.../apache/phoenix/execute/AggregatePlan.java | 2 +-
.../org/apache/phoenix/execute/ScanPlan.java | 65 ++++++++++----------
.../phoenix/iterate/BaseResultIterators.java | 14 ++++-
.../phoenix/iterate/ParallelIterators.java | 32 ++++++----
.../phoenix/iterate/TableResultIterator.java | 4 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 2 +-
.../compile/StatementHintsCompilationTest.java | 8 +--
.../query/ParallelIteratorsSplitTest.java | 2 +-
8 files changed, 75 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index afb187a..82d854b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -215,7 +215,7 @@ public class AggregatePlan extends BaseQueryPlan {
}
BaseResultIterators iterators = hasSerialHint && canBeExecutedSerially
? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan)
- : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan);
+ : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false);
splits = iterators.getSplits();
scans = iterators.getScans();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 0975b3f..31354be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -62,7 +62,6 @@ import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.PTableStats;
-import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -101,25 +100,28 @@ public class ScanPlan extends BaseQueryPlan {
}
private static boolean isSerial(StatementContext context, FilterableStatement statement,
- TableRef tableRef, OrderBy orderBy, Integer limit, Integer offset, boolean allowPageFilter) throws SQLException {
- PTable table = tableRef.getTable();
- boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL);
- boolean canBeExecutedSerially = ScanUtil.canQueryBeExecutedSerially(table, orderBy, context);
- if (!canBeExecutedSerially) {
- if (hasSerialHint) {
- logger.warn("This query cannot be executed serially. Ignoring the hint");
+ TableRef tableRef, OrderBy orderBy, boolean isDataWithinThreshold) throws SQLException {
+ if (isDataWithinThreshold) {
+ PTable table = tableRef.getTable();
+ boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL);
+ boolean canBeExecutedSerially = ScanUtil.canQueryBeExecutedSerially(table, orderBy, context);
+ if (!canBeExecutedSerially) {
+ if (hasSerialHint) {
+ logger.warn("This query cannot be executed serially. Ignoring the hint");
+ }
+ return false;
}
- return false;
- } else if (hasSerialHint) {
return true;
}
-
+ return false;
+ }
+
+ private static boolean isAmountOfDataToScanWithinThreshold(StatementContext context, PTable table, Integer perScanLimit) throws SQLException {
Scan scan = context.getScan();
/*
- * If a limit is provided and we have no filter, run the scan serially when we estimate that
- * the limit's worth of data is less than the threshold bytes provided in QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD
+ * If a limit is not provided or if we have a filter, then we are not able to decide whether
+ * the amount of data we need to scan is less than the threshold.
*/
- Integer perScanLimit = !allowPageFilter ? null : limit;
if (perScanLimit == null || scan.getFilter() != null) {
return false;
}
@@ -129,7 +131,7 @@ public class ScanPlan extends BaseQueryPlan {
ConnectionQueryServices services = context.getConnection().getQueryServices();
long estRowSize;
long estimatedParallelThresholdBytes;
- if (gpsInfo == null) {
+ if (gpsInfo == null || gpsInfo.getGuidePostsCount() == 0) {
estRowSize = SchemaUtil.estimateRowSize(table);
estimatedParallelThresholdBytes = services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE);
@@ -149,19 +151,13 @@ public class ScanPlan extends BaseQueryPlan {
}
long limitThreshold = services.getProps().getLong(QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD,
estimatedParallelThresholdBytes);
- boolean isSerial = (perScanLimit * estRowSize < limitThreshold);
-
- if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations(
- "With LIMIT=" + perScanLimit + ", estimated row size=" + estRowSize + ", limitThreshold="
- + limitThreshold + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution",
- context.getConnection()));
- return isSerial;
+ return (perScanLimit * estRowSize < limitThreshold);
}
private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, FilterableStatement statement,
- TableRef table, OrderBy orderBy, Integer limit,Integer offset, boolean allowPageFilter) throws SQLException {
+ TableRef tableRef, OrderBy orderBy, Integer limit,Integer offset, boolean allowPageFilter) throws SQLException {
- if ((isSerial(context, statement, table, orderBy, limit, offset, allowPageFilter)
+ if ((isSerial(context, statement, tableRef, orderBy, isAmountOfDataToScanWithinThreshold(context, tableRef.getTable(), QueryUtil.getOffsetLimit(limit, offset)))
|| isRoundRobinPossible(orderBy, context) || isPacingScannersPossible(context))) {
return ParallelIteratorFactory.NOOP_FACTORY;
}
@@ -176,7 +172,7 @@ public class ScanPlan extends BaseQueryPlan {
return spoolingResultIteratorFactory;
} else {
return new ChunkedResultIterator.ChunkedResultIteratorFactory(
- spoolingResultIteratorFactory, context.getConnection().getMutationState(), table);
+ spoolingResultIteratorFactory, context.getConnection().getMutationState(), tableRef);
}
}
@@ -215,19 +211,26 @@ public class ScanPlan extends BaseQueryPlan {
* limit is provided, run query serially.
*/
boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
- boolean isSerial = isSerial(context, statement, tableRef, orderBy, limit, offset, allowPageFilter);
Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
- if (perScanLimit != null) {
- perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset);
- }
- BaseResultIterators iterators;
+ perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset);
+ boolean isDataWithinThreshold = isAmountOfDataToScanWithinThreshold(context, table, perScanLimit);
+ boolean isSerial = isSerial(context, statement, tableRef, orderBy, isDataWithinThreshold);
boolean isOffsetOnServer = isOffsetPossibleOnServer(context, orderBy, offset, isSalted, table.getIndexType());
+ /*
+ * For queries that are doing a row key order by and are not possibly querying more than a
+ * threshold worth of data, then we only need to initialize scanners corresponding to the
+ * first (or last, if reverse) scan per region.
+ */
+ boolean initFirstScanOnly =
+ (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY)
+ && isDataWithinThreshold;
+ BaseResultIterators iterators;
if (isOffsetOnServer) {
iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan);
} else if (isSerial) {
iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan);
} else {
- iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan);
+ iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly);
}
splits = iterators.getSplits();
scans = iterators.getScans();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 42fe0d9..7796a17 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -492,7 +492,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private List<List<Scan>> getParallelScans() throws SQLException {
// If the scan boundaries are not matching with scan in context that means we need to get
// parallel scans for the chunk after split/merge.
- if (!ScanUtil.isConextScan(scan, context)) {
+ if (!ScanUtil.isContextScan(scan, context)) {
return getParallelScans(scan);
}
return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
@@ -919,11 +919,15 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private final int outerListIndex;
private final int innerListIndex;
private final Scan scan;
+ private final boolean isFirstScan;
+ private final boolean isLastScan;
- public ScanLocator(Scan scan, int outerListIndex, int innerListIndex) {
+ public ScanLocator(Scan scan, int outerListIndex, int innerListIndex, boolean isFirstScan, boolean isLastScan) {
this.outerListIndex = outerListIndex;
this.innerListIndex = innerListIndex;
this.scan = scan;
+ this.isFirstScan = isFirstScan;
+ this.isLastScan = isLastScan;
}
public int getOuterListIndex() {
return outerListIndex;
@@ -934,6 +938,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
public Scan getScan() {
return scan;
}
+ public boolean isFirstScan() {
+ return isFirstScan;
+ }
+ public boolean isLastScan() {
+ return isLastScan;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index d038f77..8c9b689 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -53,21 +53,23 @@ public class ParallelIterators extends BaseResultIterators {
private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class);
private static final String NAME = "PARALLEL";
private final ParallelIteratorFactory iteratorFactory;
+ private final boolean initFirstScanOnly;
- public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan)
+ public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly)
throws SQLException {
super(plan, perScanLimit, null, scanGrouper, scan);
this.iteratorFactory = iteratorFactory;
+ this.initFirstScanOnly = initFirstScanOnly;
}
- public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan)
+ public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion)
throws SQLException {
- this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan);
+ this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion);
}
@Override
- protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException {
+ protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
+ final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException {
// Pre-populate nestedFutures lists so that we can shuffle the scans
// and add the future to the right nested list. By shuffling the scans
// we get better utilization of the cluster since our thread executor
@@ -77,11 +79,12 @@ public class ParallelIterators extends BaseResultIterators {
List<ScanLocator> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
for (int i = 0; i < nestedScans.size(); i++) {
List<Scan> scans = nestedScans.get(i);
- List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size());
+ int numScans = scans.size();
+ List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(numScans);
nestedFutures.add(futures);
- for (int j = 0; j < scans.size(); j++) {
+ for (int j = 0; j < numScans; j++) {
Scan scan = nestedScans.get(i).get(j);
- scanLocations.add(new ScanLocator(scan, i, j));
+ scanLocations.add(new ScanLocator(scan, i, j, j == 0, (j == numScans - 1)));
futures.add(null); // placeholder
}
}
@@ -94,7 +97,7 @@ public class ParallelIterators extends BaseResultIterators {
context.getOverallQueryMetrics().updateNumParallelScans(numScans);
GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
final long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
- for (ScanLocator scanLocation : scanLocations) {
+ for (final ScanLocator scanLocation : scanLocations) {
final Scan scan = scanLocation.getScan();
final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
@@ -105,13 +108,18 @@ public class ParallelIterators extends BaseResultIterators {
@Override
public PeekingResultIterator call() throws Exception {
long startTime = System.currentTimeMillis();
- tableResultItr.initScanner();
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
}
PeekingResultIterator iterator = iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName, ParallelIterators.this.plan);
- // Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed.
- iterator.peek();
+ if (initFirstScanOnly) {
+ if ((!isReverse && scanLocation.isFirstScan()) || (isReverse && scanLocation.isLastScan())) {
+ // Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed.
+ iterator.peek();
+ }
+ } else {
+ iterator.peek();
+ }
allIterators.add(iterator);
return iterator;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 9256278..48b763b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -17,8 +17,8 @@
*/
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
@@ -40,7 +40,6 @@ import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.monitoring.CombinableMetric;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
-import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
@@ -48,6 +47,7 @@ import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 711717a..d7f6f2f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -679,7 +679,7 @@ public class ScanUtil {
}
}
- public static boolean isConextScan(Scan scan, StatementContext context) {
+ public static boolean isContextScan(Scan scan, StatementContext context) {
return Bytes.compareTo(context.getScan().getStartRow(), scan.getStartRow()) == 0 && Bytes
.compareTo(context.getScan().getStopRow(), scan.getStopRow()) == 0;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
index 9adf414..394bf27 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
@@ -106,12 +106,12 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest {
@Test
public void testSerialHint() throws Exception {
- // test ScanPlan
+ // test AggregatePlan
String query = "SELECT /*+ SERIAL */ COUNT(*) FROM atable";
assertTrue("Expected a SERIAL query", compileStatement(query).getExplainPlan().getPlanSteps().get(0).contains("SERIAL"));
- // test AggregatePlan
- query = "SELECT /*+ SERIAL */ * FROM atable";
- assertTrue("Expected a SERIAL query", compileStatement(query).getExplainPlan().getPlanSteps().get(0).contains("SERIAL"));
+ // test ScanPlan
+ query = "SELECT /*+ SERIAL */ * FROM atable limit 10";
+ assertTrue("Expected a SERIAL query", compileStatement(query, Collections.emptyList(), 10).getExplainPlan().getPlanSteps().get(0).contains("SERIAL"));
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 1da68ba..05fbf81 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -472,7 +472,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
return false;
}
- }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan());
+ }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false);
List<KeyRange> keyRanges = parallelIterators.getSplits();
return keyRanges;
}