You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/07/01 05:07:51 UTC
phoenix git commit: PHOENIX-3040 Don't use guideposts for executing
queries serially
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 9cddc4631 -> b3a6c0c34
PHOENIX-3040 Don't use guideposts for executing queries serially
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b3a6c0c3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b3a6c0c3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b3a6c0c3
Branch: refs/heads/4.x-HBase-0.98
Commit: b3a6c0c3464ab1d24509b9a02478c88f7d2f9666
Parents: 9cddc46
Author: Samarth <sa...@salesforce.com>
Authored: Thu Jun 30 22:07:25 2016 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Thu Jun 30 22:07:36 2016 -0700
----------------------------------------------------------------------
.../phoenix/compile/ListJarsQueryPlan.java | 5 ++
.../org/apache/phoenix/compile/QueryPlan.java | 1 +
.../apache/phoenix/compile/TraceQueryPlan.java | 5 ++
.../apache/phoenix/execute/AggregatePlan.java | 22 ++++---
.../phoenix/execute/DegenerateQueryPlan.java | 64 --------------------
.../phoenix/execute/DelegateQueryPlan.java | 5 ++
.../execute/LiteralResultIterationPlan.java | 5 ++
.../org/apache/phoenix/execute/ScanPlan.java | 58 ++++++++++++------
.../phoenix/execute/SortMergeJoinPlan.java | 5 ++
.../org/apache/phoenix/execute/UnionPlan.java | 9 +++
.../phoenix/iterate/BaseResultIterators.java | 30 +++------
.../apache/phoenix/jdbc/PhoenixStatement.java | 5 ++
.../org/apache/phoenix/query/QueryServices.java | 5 +-
.../phoenix/query/QueryServicesOptions.java | 1 +
.../query/ParallelIteratorsSplitTest.java | 5 ++
15 files changed, 110 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 2df0671..9bffad9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -251,4 +251,9 @@ public class ListJarsQueryPlan implements QueryPlan {
public Operation getOperation() {
return stmt.getUpdateOperation();
}
+
+ @Override
+ public boolean isSerial() {
+ return true;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 7722483..f403e34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -87,4 +87,5 @@ public interface QueryPlan extends StatementPlan {
*/
public boolean useRoundRobinIterator() throws SQLException;
+ public boolean isSerial();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index ed5cda9..aad3188 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -265,4 +265,9 @@ public class TraceQueryPlan implements QueryPlan {
public boolean useRoundRobinIterator() {
return false;
}
+
+ @Override
+ public boolean isSerial() {
+ return true;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 82d854b..c439618 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -61,9 +61,7 @@ import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.ScanUtil;
@@ -83,6 +81,7 @@ public class AggregatePlan extends BaseQueryPlan {
private List<KeyRange> splits;
private List<List<Scan>> scans;
private static final Logger logger = LoggerFactory.getLogger(AggregatePlan.class);
+ private boolean isSerial;
public AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table,
@@ -100,6 +99,12 @@ public class AggregatePlan extends BaseQueryPlan {
orderBy, groupBy, parallelIteratorFactory, dynamicFilter);
this.having = having;
this.aggregators = context.getAggregationManager().getAggregators();
+ boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL);
+ boolean canBeExecutedSerially = ScanUtil.canQueryBeExecutedSerially(table.getTable(), orderBy, context);
+ if (hasSerialHint && !canBeExecutedSerially) {
+ logger.warn("This query cannot be executed serially. Ignoring the hint");
+ }
+ this.isSerial = hasSerialHint && canBeExecutedSerially;
}
public Expression getHaving() {
@@ -207,13 +212,7 @@ public class AggregatePlan extends BaseQueryPlan {
PInteger.INSTANCE.toBytes(limit + (offset == null ? 0 : offset)));
}
}
- PTable table = tableRef.getTable();
- boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL);
- boolean canBeExecutedSerially = ScanUtil.canQueryBeExecutedSerially(table, orderBy, context);
- if (hasSerialHint && !canBeExecutedSerially) {
- logger.warn("This query cannot be executed serially. Ignoring the hint");
- }
- BaseResultIterators iterators = hasSerialHint && canBeExecutedSerially
+ BaseResultIterators iterators = isSerial
? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan)
: new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false);
@@ -266,4 +265,9 @@ public class AggregatePlan extends BaseQueryPlan {
public boolean useRoundRobinIterator() throws SQLException {
return false;
}
+
+ @Override
+ public boolean isSerial() {
+ return isSerial;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
deleted file mode 100644
index 5887ff3..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.iterate.ParallelScanGrouper;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
-import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.TableRef;
-
-public class DegenerateQueryPlan extends BaseQueryPlan {
-
- public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) {
- super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null,null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null, null);
- context.setScanRanges(ScanRanges.NOTHING);
- }
-
- @Override
- public List<KeyRange> getSplits() {
- return Collections.emptyList();
- }
-
- @Override
- public List<List<Scan>> getScans() {
- return Collections.emptyList();
- }
-
- @Override
- protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
- return null;
- }
-
- @Override
- public boolean useRoundRobinIterator() {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 8f0d224..f282aea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -124,4 +124,9 @@ public abstract class DelegateQueryPlan implements QueryPlan {
public Integer getOffset() {
return delegate.getOffset();
}
+
+ @Override
+ public boolean isSerial() {
+ return delegate.isSerial();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index db99964..38cb65e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -111,4 +111,9 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
return scanner;
}
+ @Override
+ public boolean isSerial() {
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 31354be..34354f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -82,6 +82,8 @@ public class ScanPlan extends BaseQueryPlan {
private List<KeyRange> splits;
private List<List<Scan>> scans;
private boolean allowPageFilter;
+ private boolean isSerial;
+ private boolean isDataToScanWithinThreshold;
public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null);
@@ -92,11 +94,16 @@ public class ScanPlan extends BaseQueryPlan {
parallelIteratorFactory != null ? parallelIteratorFactory :
buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter);
this.allowPageFilter = allowPageFilter;
- if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
+ boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
+ if (isOrdered) { // TopN
int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
ScanRegionObserver.serializeIntoScan(context.getScan(), thresholdBytes, limit == null ? -1 : limit, orderBy.getOrderByExpressions(), projector.getEstimatedRowByteSize());
}
+ Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
+ perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset);
+ this.isDataToScanWithinThreshold = isAmountOfDataToScanWithinThreshold(context, table.getTable(), perScanLimit);
+ this.isSerial = isSerial(context, statement, tableRef, orderBy, isDataToScanWithinThreshold);
}
private static boolean isSerial(StatementContext context, FilterableStatement statement,
@@ -118,23 +125,32 @@ public class ScanPlan extends BaseQueryPlan {
private static boolean isAmountOfDataToScanWithinThreshold(StatementContext context, PTable table, Integer perScanLimit) throws SQLException {
Scan scan = context.getScan();
- /*
- * If a limit is not provided or if we have a filter, then we are not able to decide whether
- * the amount of data we need to scan is less than the threshold.
- */
+ ConnectionQueryServices services = context.getConnection().getQueryServices();
+ long estRowSize = SchemaUtil.estimateRowSize(table);
+ long regionSize = services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE,
+ HConstants.DEFAULT_MAX_FILE_SIZE);
if (perScanLimit == null || scan.getFilter() != null) {
+ /*
+ * If a limit is not provided or if we have a filter, then we are not able to decide whether
+ * the amount of data we need to scan is less than the threshold.
+ */
return false;
+ } else if (perScanLimit != null && scan.getFilter() == null) {
+ /*
+ * In presence of a limit and in absence of a filter, we are not relying on guide post info to
+ * see if we are beyond a threshold.
+ */
+ float factor =
+ services.getProps().getFloat(QueryServices.NONFILTERED_AND_LIMITED_QUERY_SERIAL_THRESHOLD,
+ QueryServicesOptions.DEFAULT_NONFILTERED_LIMITED_QUERY_SERIAL_THRESHOLD);
+ return Float.compare(estRowSize * perScanLimit, factor * regionSize) < 0;
}
long scn = context.getConnection().getSCN() == null ? Long.MAX_VALUE : context.getConnection().getSCN();
PTableStats tableStats = context.getConnection().getQueryServices().getTableStats(table.getName().getBytes(), scn);
GuidePostsInfo gpsInfo = tableStats.getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
- ConnectionQueryServices services = context.getConnection().getQueryServices();
- long estRowSize;
- long estimatedParallelThresholdBytes;
+ long threshold;
if (gpsInfo == null || gpsInfo.getGuidePostsCount() == 0) {
- estRowSize = SchemaUtil.estimateRowSize(table);
- estimatedParallelThresholdBytes = services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE,
- HConstants.DEFAULT_MAX_FILE_SIZE);
+ threshold = regionSize;
} else {
long totByteSize = 0;
long totRowCount = 0;
@@ -145,13 +161,13 @@ public class ScanPlan extends BaseQueryPlan {
totRowCount += rowCount;
}
estRowSize = totByteSize / totRowCount;
- estimatedParallelThresholdBytes = 2
+ threshold = 2
* services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
}
- long limitThreshold = services.getProps().getLong(QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD,
- estimatedParallelThresholdBytes);
- return (perScanLimit * estRowSize < limitThreshold);
+ long thresholdToUse = services.getProps().getLong(QueryServices.FILTERED_OR_NONLIMITED_QUERY_SERIAL_THRESHOLD,
+ threshold);
+ return (perScanLimit * estRowSize < thresholdToUse);
}
private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, FilterableStatement statement,
@@ -211,10 +227,7 @@ public class ScanPlan extends BaseQueryPlan {
* limit is provided, run query serially.
*/
boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
- Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
- perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset);
- boolean isDataWithinThreshold = isAmountOfDataToScanWithinThreshold(context, table, perScanLimit);
- boolean isSerial = isSerial(context, statement, tableRef, orderBy, isDataWithinThreshold);
+ Integer perScanLimit = QueryUtil.getOffsetLimit(!allowPageFilter || isOrdered ? null : limit, offset);
boolean isOffsetOnServer = isOffsetPossibleOnServer(context, orderBy, offset, isSalted, table.getIndexType());
/*
* For queries that are doing a row key order by and are not possibly querying more than a
@@ -223,7 +236,7 @@ public class ScanPlan extends BaseQueryPlan {
*/
boolean initFirstScanOnly =
(orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY)
- && isDataWithinThreshold;
+ && isDataToScanWithinThreshold;
BaseResultIterators iterators;
if (isOffsetOnServer) {
iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan);
@@ -280,4 +293,9 @@ public class ScanPlan extends BaseQueryPlan {
return ScanUtil.isRoundRobinPossible(orderBy, context);
}
+ @Override
+ public boolean isSerial() {
+ return isSerial;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 8e0e6e2..a8ad3eb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -672,4 +672,9 @@ public class SortMergeJoinPlan implements QueryPlan {
return tableRefs;
}
+ @Override
+ public boolean isSerial() {
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index cf95b5b..f60f81f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -231,4 +231,13 @@ public class UnionPlan implements QueryPlan {
}
return sources;
}
+
+ @Override
+ public boolean isSerial() {
+ boolean isSerial = true;
+ for (QueryPlan plan : getPlans()) {
+ isSerial &= plan.isSerial();
+ }
+ return isSerial;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 7796a17..e3d512f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -142,15 +142,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
private boolean useStats() {
- boolean isPointLookup = context.getScanRanges().isPointLookup();
/*
- * Don't use guide posts if:
- * 1) We're doing a point lookup, as HBase is fast enough at those
- * to not need them to be further parallelized. TODO: perf test to verify
- * 2) We're collecting stats, as in this case we need to scan entire
- * regions worth of data to track where to put the guide posts.
+ * Don't use guide posts:
+ * 1) If we're collecting stats, as in this case we need to scan entire
+ * regions worth of data to track where to put the guide posts.
+ * 2) If the query is going to be executed serially.
*/
- if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) {
+ if (ScanUtil.isAnalyzeTable(scan) || plan.isSerial()) {
return false;
}
return true;
@@ -423,11 +421,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
private GuidePostsInfo getGuidePosts(Set<byte[]> whereConditions) {
- /*
- * Don't use guide posts if: 1) We're doing a point lookup, as HBase is fast enough at those to not need them to
- * be further parallelized. TODO: pref test to verify 2) We're collecting stats, as in this case we need to scan
- * entire regions worth of data to track where to put the guide posts.
- */
if (!useStats()) { return GuidePostsInfo.NO_GUIDEPOST; }
GuidePostsInfo gps = null;
@@ -629,7 +622,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
} catch (EOFException e) {}
}
byte[] currentKeyBytes = currentKey.copyBytes();
-
// Merge bisect with guideposts for all but the last region
while (regionIndex <= stopIndex) {
HRegionLocation regionLocation = regionLocations.get(regionIndex);
@@ -649,11 +641,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
false);
- if(newScan != null) {
+ if (newScan != null) {
ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
- }
- if (newScan != null) {
estimatedRows += gps.getRowCounts().get(guideIndex);
estimatedSize += gps.getByteCounts().get(guideIndex);
}
@@ -673,12 +663,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
currentKeyBytes = endKey;
regionIndex++;
}
- if (hasGuidePosts) {
- this.estimatedRows = estimatedRows;
- this.estimatedSize = estimatedSize;
- } else if (scanRanges.isPointLookup()) {
+ if (scanRanges.isPointLookup()) {
this.estimatedRows = 1L;
this.estimatedSize = SchemaUtil.estimateRowSize(table);
+ } else if (hasGuidePosts) {
+ this.estimatedRows = estimatedRows;
+ this.estimatedSize = estimatedSize;
} else {
this.estimatedRows = null;
this.estimatedSize = null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 5e0f53c..a2ffae0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -570,6 +570,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
public boolean useRoundRobinIterator() throws SQLException {
return false;
}
+
+ @Override
+ public boolean isSerial() {
+ return true;
+ }
};
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index d82f5a9..c234902 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -213,11 +213,12 @@ public interface QueryServices extends SQLCloseable {
public static final String HCONNECTION_POOL_CORE_SIZE = "hbase.hconnection.threads.core";
public static final String HCONNECTION_POOL_MAX_SIZE = "hbase.hconnection.threads.max";
public static final String HTABLE_MAX_THREADS = "hbase.htable.threads.max";
- public static final String QUERY_PARALLEL_LIMIT_THRESHOLD = "phoenix.query.parallelThresholdBytes";
// time to wait before running second index population upsert select (so that any pending batches of rows on region server are also written to index)
public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time";
public static final String LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB = "phoenix.client.localIndexUpgrade";
-
+ public static final String NONFILTERED_AND_LIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.nonfiltered.limited.query.serial.threshold";
+ public static final String FILTERED_OR_NONLIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.filtered.limited.query.serial.threshold";
+
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 006bd3c..2b77417 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -245,6 +245,7 @@ public class QueryServicesOptions {
(3 * DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD) / 4;
public static final int DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE = 10;
public static final boolean DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE = true;
+ public static final float DEFAULT_NONFILTERED_LIMITED_QUERY_SERIAL_THRESHOLD = 0.2f;
@SuppressWarnings("serial")
public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b3a6c0c3/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 05fbf81..8c65937 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -471,6 +471,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
public boolean useRoundRobinIterator() {
return false;
}
+
+ @Override
+ public boolean isSerial() {
+ return true;
+ }
}, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false);
List<KeyRange> keyRanges = parallelIterators.getSplits();