You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/02/08 07:07:04 UTC
phoenix git commit: PHOENIX-3941 Filter regions to scan for local
indexes based on data table leading pk filter conditions
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.3 2e919fc5e -> 8ba8d7b6a
PHOENIX-3941 Filter regions to scan for local indexes based on data table leading pk filter conditions
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8ba8d7b6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8ba8d7b6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8ba8d7b6
Branch: refs/heads/4.x-HBase-1.3
Commit: 8ba8d7b6a86d6519469780e05508db26be4f0557
Parents: 2e919fc
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Feb 7 23:02:44 2018 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Feb 7 23:05:02 2018 -0800
----------------------------------------------------------------------
.../apache/phoenix/compile/DeleteCompiler.java | 2 +-
.../org/apache/phoenix/compile/ExplainPlan.java | 10 +
.../apache/phoenix/compile/JoinCompiler.java | 10 +-
.../apache/phoenix/compile/PostDDLCompiler.java | 2 +-
.../apache/phoenix/compile/QueryCompiler.java | 20 +-
.../org/apache/phoenix/compile/ScanRanges.java | 12 +-
.../apache/phoenix/compile/UpsertCompiler.java | 4 +-
.../apache/phoenix/execute/AggregatePlan.java | 12 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 6 +-
.../execute/LiteralResultIterationPlan.java | 2 +-
.../org/apache/phoenix/execute/ScanPlan.java | 18 +-
.../phoenix/iterate/BaseResultIterators.java | 226 ++++++++++++++++++-
.../apache/phoenix/iterate/ExplainTable.java | 3 +-
.../phoenix/iterate/ParallelIterators.java | 8 +-
.../apache/phoenix/iterate/SerialIterators.java | 4 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 2 +-
.../apache/phoenix/optimize/QueryOptimizer.java | 4 +-
.../query/ConnectionlessQueryServicesImpl.java | 8 +-
.../phoenix/compile/QueryCompilerTest.java | 226 +++++++++++++++++++
.../apache/phoenix/query/KeyRangeClipTest.java | 155 +++++++++++++
.../query/ParallelIteratorsSplitTest.java | 2 +-
21 files changed, 674 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index fd80238..54e63d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -595,7 +595,7 @@ public class DeleteCompiler {
}
final RowProjector projector = projectorToBe;
final QueryPlan aggPlan = new AggregatePlan(context, select, dataPlan.getTableRef(), projector, null, null,
- OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+ OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, dataPlan);
return new ServerSelectDeleteMutationPlan(dataPlan, connection, aggPlan, projector, maxSize, maxSizeBytes);
} else {
final DeletingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
index 2bc7809..ef34daa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java
@@ -34,4 +34,14 @@ public class ExplainPlan {
public List<String> getPlanSteps() {
return planSteps;
}
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ for (String step : planSteps) {
+ buf.append(step);
+ buf.append('\n');
+ }
+ return buf.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 439a79b..f3c4c24 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -17,8 +17,8 @@
*/
package org.apache.phoenix.compile;
-import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -80,8 +80,6 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.ProjectedColumn;
@@ -1182,7 +1180,7 @@ public class JoinCompiler {
}
JoinTable join = compile(statement, select, resolver);
if (groupByTableRef != null || orderByTableRef != null) {
- QueryCompiler compiler = new QueryCompiler(statement, select, resolver, false);
+ QueryCompiler compiler = new QueryCompiler(statement, select, resolver, false, null);
List<Object> binds = statement.getParameters();
StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));
QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null);
@@ -1204,6 +1202,10 @@ public class JoinCompiler {
List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null;
List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null;
SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), table.getTableSamplingRate(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes());
+ // TODO: As port of PHOENIX-4585, we need to make sure this plan has a pointer to the data plan
+ // when an index is used instead of the data table, and that this method returns that
+ // state for downstream processing.
+ // TODO: It seems inefficient to be recompiling the statement again and again inside of this optimize call
QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, stmt);
if (!plan.getTableRef().equals(tableRef)) {
replacement.put(tableRef, plan.getTableRef());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index e5ed6a5..709534e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -283,7 +283,7 @@ public class PostDDLCompiler {
continue;
}
QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null,
- OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+ OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null);
try {
ResultIterator iterator = plan.iterator();
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 287f9e0..3b14850 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -104,16 +104,13 @@ public class QueryCompiler {
private final boolean projectTuples;
private final boolean useSortMergeJoin;
private final boolean noChildParentJoinOptimization;
+ private final QueryPlan dataPlan;
- public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException {
- this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), true);
+ public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws SQLException {
+ this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, dataPlan);
}
- public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples) throws SQLException {
- this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples);
- }
-
- public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples) throws SQLException {
+ public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, QueryPlan dataPlan) throws SQLException {
this.statement = statement;
this.select = select;
this.resolver = resolver;
@@ -133,10 +130,11 @@ public class QueryCompiler {
scan.setCaching(statement.getFetchSize());
this.originalScan = ScanUtil.newScan(scan);
+ this.dataPlan = dataPlan;
}
public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager) throws SQLException {
- this(statement, select, resolver, targetColumns, parallelIteratorFactory, sequenceManager, true);
+ this(statement, select, resolver, targetColumns, parallelIteratorFactory, sequenceManager, true, null);
}
/**
@@ -495,7 +493,7 @@ public class QueryCompiler {
}
int maxRows = this.statement.getMaxRows();
this.statement.setMaxRows(pushDownMaxRows ? maxRows : 0); // overwrite maxRows to avoid its impact on inner queries.
- QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false).compile();
+ QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, dataPlan).compile();
plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
this.statement.setMaxRows(maxRows); // restore maxRows.
return plan;
@@ -586,9 +584,9 @@ public class QueryCompiler {
parallelIteratorFactory)
: (select.isAggregate() || select.isDistinct()
? new AggregatePlan(context, select, tableRef, projector, limit, offset, orderBy,
- parallelIteratorFactory, groupBy, having)
+ parallelIteratorFactory, groupBy, having, dataPlan)
: new ScanPlan(context, select, tableRef, projector, limit, offset, orderBy,
- parallelIteratorFactory, allowPageFilter));
+ parallelIteratorFactory, allowPageFilter, dataPlan));
}
if (!subqueries.isEmpty()) {
int count = subqueries.size();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 18e575c..8c71248 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -147,8 +147,10 @@ public class ScanRanges {
scanRange = KeyRange.getKeyRange(minKey, maxKey);
}
if (minMaxRange != KeyRange.EVERYTHING_RANGE) {
- minMaxRange = ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new ImmutableBytesWritable());
- scanRange = scanRange.intersect(minMaxRange);
+ // Intersect using modified min/max range, but keep original range to ensure it
+ // can still be decomposed into it's parts
+ KeyRange inclusiveExclusiveMinMaxRange = ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new ImmutableBytesWritable());
+ scanRange = scanRange.intersect(inclusiveExclusiveMinMaxRange);
}
if (scanRange == KeyRange.EMPTY_RANGE) {
@@ -573,7 +575,7 @@ public class ScanRanges {
}
public int getBoundPkColumnCount() {
- return this.useSkipScanFilter ? ScanUtil.getRowKeyPosition(slotSpan, ranges.size()) : Math.max(getBoundPkSpan(ranges, slotSpan), getBoundMinMaxSlotCount());
+ return Math.max(getBoundPkSpan(ranges, slotSpan), getBoundMinMaxSlotCount());
}
private int getBoundMinMaxSlotCount() {
@@ -625,6 +627,10 @@ public class ScanRanges {
public int[] getSlotSpans() {
return slotSpan;
}
+
+ public KeyRange getScanRange() {
+ return scanRange;
+ }
public boolean hasEqualityConstraint(int pkPosition) {
int pkOffset = 0;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index a81a427..08133a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -549,7 +549,7 @@ public class UpsertCompiler {
select = SelectStatement.create(select, hint);
// Pass scan through if same table in upsert and select so that projection is computed correctly
// Use optimizer to choose the best plan
- QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false);
+ QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false, null);
queryPlanToBe = compiler.compile();
// This is post-fix: if the tableRef is a projected table, this means there are post-processing
// steps and parallelIteratorFactory did not take effect.
@@ -697,7 +697,7 @@ public class UpsertCompiler {
scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions));
// Ignore order by - it has no impact
- final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+ final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, originalQueryPlan);
return new ServerUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, maxSize, maxSizeBytes);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 369769e..2e042e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -90,17 +90,17 @@ public class AggregatePlan extends BaseQueryPlan {
public AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table,
RowProjector projector, Integer limit, Integer offset, OrderBy orderBy,
- ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having) throws SQLException {
+ ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having, QueryPlan dataPlan) throws SQLException {
this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, groupBy, having,
- null);
+ null, dataPlan);
}
private AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table,
RowProjector projector, Integer limit, Integer offset, OrderBy orderBy,
ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having,
- Expression dynamicFilter) throws SQLException {
+ Expression dynamicFilter, QueryPlan dataPlan) throws SQLException {
super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, offset,
- orderBy, groupBy, parallelIteratorFactory, dynamicFilter);
+ orderBy, groupBy, parallelIteratorFactory, dynamicFilter, dataPlan);
this.having = having;
this.aggregators = context.getAggregationManager().getAggregators();
boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL);
@@ -251,8 +251,8 @@ public class AggregatePlan extends BaseQueryPlan {
}
}
BaseResultIterators iterators = isSerial
- ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan, caches)
- : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false, caches);
+ ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan, caches, dataPlan)
+ : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false, caches, dataPlan);
estimatedRows = iterators.getEstimatedRowCount();
estimatedSize = iterators.getEstimatedByteCount();
estimateInfoTimestamp = iterators.getEstimateInfoTimestamp();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 380037f..0bc606e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -63,8 +63,6 @@ import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PName;
@@ -114,6 +112,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
* immediately before creating the ResultIterator.
*/
protected final Expression dynamicFilter;
+ protected final QueryPlan dataPlan;
protected Long estimatedRows;
protected Long estimatedSize;
protected Long estimateInfoTimestamp;
@@ -124,7 +123,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
StatementContext context, FilterableStatement statement, TableRef table,
RowProjector projection, ParameterMetaData paramMetaData, Integer limit, Integer offset, OrderBy orderBy,
GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory,
- Expression dynamicFilter) {
+ Expression dynamicFilter, QueryPlan dataPlan) {
this.context = context;
this.statement = statement;
this.tableRef = table;
@@ -137,6 +136,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
this.groupBy = groupBy;
this.parallelIteratorFactory = parallelIteratorFactory;
this.dynamicFilter = dynamicFilter;
+ this.dataPlan = dataPlan;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 1d1332d..c9abb69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -56,7 +56,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
public LiteralResultIterationPlan(Iterable<Tuple> tuples, StatementContext context,
FilterableStatement statement, TableRef tableRef, RowProjector projection,
Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
- super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, offset, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null);
+ super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, offset, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null, null);
this.tuples = tuples;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 31d7097..d63950c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -93,14 +94,17 @@ public class ScanPlan extends BaseQueryPlan {
private Long serialBytesEstimate;
private Long serialEstimateInfoTs;
- public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
- this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null);
+ public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit,
+ Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter,
+ QueryPlan dataPlan) throws SQLException {
+ this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null, dataPlan);
}
- private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException {
+ private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset,
+ OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter, QueryPlan dataPlan) throws SQLException {
super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit,offset, orderBy, GroupBy.EMPTY_GROUP_BY,
parallelIteratorFactory != null ? parallelIteratorFactory :
- buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter);
+ buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter, dataPlan);
this.allowPageFilter = allowPageFilter;
boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
if (isOrdered) { // TopN
@@ -260,11 +264,11 @@ public class ScanPlan extends BaseQueryPlan {
&& isDataToScanWithinThreshold;
BaseResultIterators iterators;
if (isOffsetOnServer) {
- iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan, caches);
+ iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan, caches, dataPlan);
} else if (isSerial) {
- iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan, caches);
+ iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan, caches, dataPlan);
} else {
- iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches);
+ iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches, dataPlan);
}
estimatedRows = iterators.getEstimatedRowCount();
estimatedSize = iterators.getEstimatedByteCount();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index bd67fa8..25722a9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -37,6 +37,7 @@ import java.io.DataInputStream;
import java.io.EOFException;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
@@ -93,6 +94,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
@@ -101,9 +103,11 @@ import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
import org.apache.phoenix.schema.stats.StatisticsUtil;
@@ -157,6 +161,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private Scan scan;
private final boolean useStatsForParallelization;
protected Map<ImmutableBytesPtr,ServerCache> caches;
+ private final QueryPlan dataPlan;
static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
@Override
@@ -473,13 +478,14 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
}
- public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+ public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan dataPlan) throws SQLException {
super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(),
plan.getStatement().getHint(), QueryUtil.getOffsetLimit(plan.getLimit(), plan.getOffset()), offset);
this.plan = plan;
this.scan = scan;
this.caches = caches;
this.scanGrouper = scanGrouper;
+ this.dataPlan = dataPlan;
StatementContext context = plan.getContext();
// Clone MutationState as the one on the connection will change if auto commit is on
// yet we need the original one with the original transaction from TableResultIterator.
@@ -681,6 +687,173 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private long rowsEstimate;
}
+ private int computeColumnsInCommon() {
+ PTable dataTable;
+ if ((dataTable=dataPlan.getTableRef().getTable()).getBucketNum() != null) { // unable to compute prefix range for salted data table
+ return 0;
+ }
+
+ PTable table = getTable();
+ int nColumnsOffset = dataTable.isMultiTenant() ? 1 :0;
+ int nColumnsInCommon = nColumnsOffset;
+ List<PColumn> dataPKColumns = dataTable.getPKColumns();
+ List<PColumn> indexPKColumns = table.getPKColumns();
+ int nIndexPKColumns = indexPKColumns.size();
+ int nDataPKColumns = dataPKColumns.size();
+ // Skip INDEX_ID and tenant ID columns
+ for (int i = 1 + nColumnsInCommon; i < nIndexPKColumns; i++) {
+ PColumn indexColumn = indexPKColumns.get(i);
+ String indexColumnName = indexColumn.getName().getString();
+ String cf = IndexUtil.getDataColumnFamilyName(indexColumnName);
+ if (cf.length() != 0) {
+ break;
+ }
+ if (i > nDataPKColumns) {
+ break;
+ }
+ PColumn dataColumn = dataPKColumns.get(i-1);
+ String dataColumnName = dataColumn.getName().getString();
+ // Ensure both name and type are the same. Because of the restrictions we have
+ // on PK column types (namely that you can only have a fixed width nullable
+ // column as your last column), the type check is more of a sanity check
+ // since it wouldn't make sense to have an index with every column in common.
+ if (indexColumn.getDataType() == dataColumn.getDataType()
+ && dataColumnName.equals(IndexUtil.getDataColumnName(indexColumnName))) {
+ nColumnsInCommon++;
+ continue;
+ }
+ break;
+ }
+ return nColumnsInCommon;
+ }
+
+ // public for testing
+ public static ScanRanges computePrefixScanRanges(ScanRanges dataScanRanges, int nColumnsInCommon) {
+ if (nColumnsInCommon == 0) {
+ return ScanRanges.EVERYTHING;
+ }
+
+ int offset = 0;
+ List<List<KeyRange>> cnf = Lists.newArrayListWithExpectedSize(nColumnsInCommon);
+ int[] slotSpan = new int[nColumnsInCommon];
+ boolean useSkipScan = false;
+ boolean hasRange = false;
+ List<List<KeyRange>> rangesList = dataScanRanges.getRanges();
+ int rangesListSize = rangesList.size();
+ while (offset < nColumnsInCommon && offset < rangesListSize) {
+ List<KeyRange> ranges = rangesList.get(offset);
+ // We use a skip scan if we have multiple ranges or if
+ // we have a non single key range before the last range.
+ useSkipScan |= ranges.size() > 1 || hasRange;
+ cnf.add(ranges);
+ int rangeSpan = 1 + dataScanRanges.getSlotSpans()[offset];
+ if (offset + rangeSpan > nColumnsInCommon) {
+ rangeSpan = nColumnsInCommon - offset;
+ // trim range to only be rangeSpan in length
+ ranges = Lists.newArrayListWithExpectedSize(cnf.get(cnf.size()-1).size());
+ for (KeyRange range : cnf.get(cnf.size()-1)) {
+ range = clipRange(dataScanRanges.getSchema(), offset, rangeSpan, range);
+ // trim range to be only rangeSpan in length
+ ranges.add(range);
+ }
+ cnf.set(cnf.size()-1, ranges);
+ }
+ for (KeyRange range : ranges) {
+ if (!range.isSingleKey()) {
+ hasRange = true;
+ }
+ }
+ slotSpan[offset] = rangeSpan - 1;
+ offset = offset + rangeSpan;
+ }
+ useSkipScan &= dataScanRanges.useSkipScanFilter();
+ KeyRange minMaxRange =
+ clipRange(dataScanRanges.getSchema(), 0, nColumnsInCommon, dataScanRanges.getMinMaxRange());
+ slotSpan = slotSpan.length == cnf.size() ? slotSpan : Arrays.copyOf(slotSpan, cnf.size());
+ ScanRanges commonScanRanges = ScanRanges.create(dataScanRanges.getSchema(), cnf, slotSpan, minMaxRange, null, useSkipScan, -1);
+ return commonScanRanges;
+ }
+
+ /**
+ * Truncates range to be a max of rangeSpan fields
+ * @param schema row key schema
+ * @param fieldIndex starting index of field with in the row key schema
+ * @param rangeSpan maximum field length
+ * @return the same range if unchanged and otherwise a new range
+ */
+ public static KeyRange clipRange(RowKeySchema schema, int fieldIndex, int rangeSpan, KeyRange range) {
+ if (range == KeyRange.EVERYTHING_RANGE) {
+ return range;
+ }
+ if (range == KeyRange.EMPTY_RANGE) {
+ return range;
+ }
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ boolean newRange = false;
+ boolean lowerUnbound = range.lowerUnbound();
+ boolean lowerInclusive = range.isLowerInclusive();
+ byte[] lowerRange = range.getLowerRange();
+ if (!lowerUnbound && lowerRange.length > 0) {
+ if (clipKeyRangeBytes(schema, fieldIndex, rangeSpan, lowerRange, ptr, true)) {
+ // Make lower range inclusive since we're decreasing the range by chopping the last part off
+ lowerInclusive = true;
+ lowerRange = ptr.copyBytes();
+ newRange = true;
+ }
+ }
+ boolean upperUnbound = range.upperUnbound();
+ boolean upperInclusive = range.isUpperInclusive();
+ byte[] upperRange = range.getUpperRange();
+ if (!upperUnbound && upperRange.length > 0) {
+ if (clipKeyRangeBytes(schema, fieldIndex, rangeSpan, upperRange, ptr, false)) {
+ // Make lower range inclusive since we're decreasing the range by chopping the last part off
+ upperInclusive = true;
+ upperRange = ptr.copyBytes();
+ newRange = true;
+ }
+ }
+
+ return newRange ? KeyRange.getKeyRange(lowerRange, lowerInclusive, upperRange, upperInclusive) : range;
+ }
+
+ private static boolean clipKeyRangeBytes(RowKeySchema schema, int fieldIndex, int rangeSpan, byte[] rowKey, ImmutableBytesWritable ptr, boolean trimTrailingNulls) {
+ int position = 0;
+ int maxOffset = schema.iterator(rowKey, ptr);
+ byte[] newRowKey = new byte[rowKey.length];
+ int offset = 0;
+ int trailingNullsToTrim = 0;
+ do {
+ if (schema.next(ptr, fieldIndex, maxOffset) == null) {
+ break;
+ }
+ System.arraycopy(ptr.get(), ptr.getOffset(), newRowKey, offset, ptr.getLength());
+ offset += ptr.getLength();
+ Field field = schema.getField(fieldIndex);
+ if (field.getDataType().isFixedWidth()) {
+ trailingNullsToTrim = 0;
+ } else {
+ boolean isNull = ptr.getLength() == 0;
+ byte sepByte = SchemaUtil.getSeparatorByte(true, isNull, field);
+ newRowKey[offset++] = sepByte;
+ if (isNull) {
+ if (trimTrailingNulls) {
+ trailingNullsToTrim++;
+ } else {
+ trailingNullsToTrim = 0;
+ }
+ } else {
+ // So that last zero separator byte is always trimmed
+ trailingNullsToTrim = 1;
+ }
+ }
+ fieldIndex++;
+ } while (++position < rangeSpan);
+ // remove trailing nulls
+ ptr.set(newRowKey, 0, offset - trailingNullsToTrim);
+ // return true if we've clipped the rowKey
+ return maxOffset != offset;
+ }
+
/**
* Compute the list of parallel scans to run for a given query. The inner scans
* may be concatenated together directly, while the other ones may need to be
@@ -702,26 +875,43 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// case we generate an empty guide post with the byte estimate being set as guide post
// width.
boolean emptyGuidePost = gps.isEmptyGuidePost();
+ byte[] startRegionBoundaryKey = startKey;
+ byte[] stopRegionBoundaryKey = stopKey;
+ int columnsInCommon = 0;
+ ScanRanges prefixScanRanges = ScanRanges.EVERYTHING;
boolean traverseAllRegions = isSalted || isLocalIndex;
- if (!traverseAllRegions) {
+ if (isLocalIndex) {
+ // TODO: when implementing PHOENIX-4585, we should change this to an assert
+ // as we should always have a data plan when a local index is being used.
+ if (dataPlan != null && dataPlan.getTableRef().getTable().getType() != PTableType.INDEX) { // Sanity check
+ prefixScanRanges = computePrefixScanRanges(dataPlan.getContext().getScanRanges(), columnsInCommon=computeColumnsInCommon());
+ KeyRange prefixRange = prefixScanRanges.getScanRange();
+ if (!prefixRange.lowerUnbound()) {
+ startRegionBoundaryKey = prefixRange.getLowerRange();
+ }
+ if (!prefixRange.upperUnbound()) {
+ stopRegionBoundaryKey = prefixRange.getUpperRange();
+ }
+ }
+ } else if (!traverseAllRegions) {
byte[] scanStartRow = scan.getStartRow();
if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
- startKey = scanStartRow;
+ startRegionBoundaryKey = startKey = scanStartRow;
}
byte[] scanStopRow = scan.getStopRow();
if (stopKey.length == 0
|| (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) {
- stopKey = scanStopRow;
+ stopRegionBoundaryKey = stopKey = scanStopRow;
}
}
int regionIndex = 0;
int stopIndex = regionBoundaries.size();
- if (startKey.length > 0) {
- regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+ if (startRegionBoundaryKey.length > 0) {
+ regionIndex = getIndexContainingInclusive(regionBoundaries, startRegionBoundaryKey);
}
- if (stopKey.length > 0) {
- stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
+ if (stopRegionBoundaryKey.length > 0) {
+ stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopRegionBoundaryKey));
if (isLocalIndex) {
stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
}
@@ -771,15 +961,29 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
HRegionLocation regionLocation = regionLocations.get(regionIndex);
HRegionInfo regionInfo = regionLocation.getRegionInfo();
byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
- byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
+ byte[] endKey;
if (regionIndex == stopIndex) {
endKey = stopKey;
} else {
endKey = regionBoundaries.get(regionIndex);
}
if (isLocalIndex) {
- endRegionKey = regionInfo.getEndKey();
- keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
+ // Only attempt further pruning if the prefix range is using
+ // a skip scan since we've already pruned the range of regions
+ // based on the start/stop key.
+ if (columnsInCommon > 0 && prefixScanRanges.useSkipScanFilter()) {
+ byte[] regionStartKey = regionInfo.getStartKey();
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ clipKeyRangeBytes(prefixScanRanges.getSchema(), 0, columnsInCommon, regionStartKey, ptr, false);
+ regionStartKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ // Prune this region if there's no intersection
+ if (!prefixScanRanges.intersectRegion(regionStartKey, regionInfo.getEndKey(), false)) {
+ currentKeyBytes = endKey;
+ regionIndex++;
+ continue;
+ }
+ }
+ keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey());
}
byte[] initialKeyBytes = currentKeyBytes;
while (intersectWithGuidePosts && (endKey.length == 0 || currentGuidePost.compareTo(endKey) <= 0)) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 06ac3c0..265e213 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -265,7 +265,8 @@ public abstract class ExplainTable {
if (minMaxRange != KeyRange.EVERYTHING_RANGE) {
RowKeySchema schema = tableRef.getTable().getRowKeySchema();
if (!minMaxRange.isUnbound(bound)) {
- minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound));
+ // Use scan ranges from ScanRanges since it will have been intersected with minMaxRange
+ minMaxIterator = new RowKeyValueIterator(schema, scanRanges.getScanRange().getRange(bound));
}
}
boolean isLocalIndex = ScanUtil.isLocalIndex(context.getScan());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 3c11f4a..3a4b084 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -57,16 +57,16 @@ public class ParallelIterators extends BaseResultIterators {
private final ParallelIteratorFactory iteratorFactory;
private final boolean initFirstScanOnly;
- public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr,ServerCache> caches)
+ public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan dataPlan)
throws SQLException {
- super(plan, perScanLimit, null, scanGrouper, scan,caches);
+ super(plan, perScanLimit, null, scanGrouper, scan,caches, dataPlan);
this.iteratorFactory = iteratorFactory;
this.initFirstScanOnly = initFirstScanOnly;
}
- public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion, Map<ImmutableBytesPtr,ServerCache> caches)
+ public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion, Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan dataPlan)
throws SQLException {
- this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches);
+ this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches, dataPlan);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 26d1ed1..f94a7c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -62,9 +62,9 @@ public class SerialIterators extends BaseResultIterators {
private final Integer offset;
public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset,
- ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches)
+ ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan dataPlan)
throws SQLException {
- super(plan, perScanLimit, offset, scanGrouper, scan, caches);
+ super(plan, perScanLimit, offset, scanGrouper, scan, caches, dataPlan);
this.offset = offset;
// must be a offset or a limit specified or a SERIAL hint
Preconditions.checkArgument(
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index b637173..6d203c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -476,7 +476,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
select = StatementNormalizer.normalize(transformedSelect, resolver);
}
- QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true).compile();
+ QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true, null).compile();
plan.getContext().getSequenceManager().validateSequences(seqAction);
return plan;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 3f5f5ed..8481bc5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -242,7 +242,7 @@ public class QueryOptimizer {
try {
// translate nodes that match expressions that are indexed to the associated column parse node
indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes()));
- QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected);
+ QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, dataPlan);
QueryPlan plan = compiler.compile();
// If query doesn't have where clause and some of columns to project are missing
@@ -314,7 +314,7 @@ public class QueryOptimizer {
query = SubqueryRewriter.transform(query, queryResolver, statement.getConnection());
queryResolver = FromCompiler.getResolverForQuery(query, statement.getConnection());
query = StatementNormalizer.normalize(query, queryResolver);
- QueryPlan plan = new QueryCompiler(statement, query, queryResolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected).compile();
+ QueryPlan plan = new QueryCompiler(statement, query, queryResolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, dataPlan).compile();
return plan;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 3154f86..d25299a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -81,6 +81,7 @@ import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.JDBCUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -237,7 +238,12 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] physicalName, PTableType tableType,
Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits,
boolean isNamespaceMapped, boolean allocateIndexId) throws SQLException {
- if (splits != null) {
+ if (tableType == PTableType.INDEX && IndexUtil.isLocalIndexFamily(Bytes.toString(families.iterator().next().getFirst()))) {
+ Object dataTableName = tableProps.get(PhoenixDatabaseMetaData.DATA_TABLE_NAME);
+ List<HRegionLocation> regionLocations = tableSplits.get(dataTableName);
+ byte[] tableName = getTableName(tableMetaData, physicalName);
+ tableSplits.put(Bytes.toString(tableName), regionLocations);
+ } else if (splits != null) {
byte[] tableName = getTableName(tableMetaData, physicalName);
tableSplits.put(Bytes.toString(tableName), generateRegionLocations(tableName, splits));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 5a672ba..1d61003 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -4161,4 +4161,230 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
assertEquals(e.getErrorCode(), SQLExceptionCode.CONNECTION_CLOSED.getErrorCode());
}
}
+
+ @Test
+ public void testSingleColLocalIndexPruning() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE T (\n" +
+ " A CHAR(1) NOT NULL,\n" +
+ " B CHAR(1) NOT NULL,\n" +
+ " C CHAR(1) NOT NULL,\n" +
+ " CONSTRAINT PK PRIMARY KEY (\n" +
+ " A,\n" +
+ " B,\n" +
+ " C\n" +
+ " )\n" +
+ ") SPLIT ON ('A','C','E','G','I')");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,C)");
+ String query = "SELECT * FROM T WHERE A = 'B' and C='C'";
+ PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class);
+ QueryPlan plan = statement.optimizeQuery(query);
+ assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString());
+ plan.iterator();
+ List<List<Scan>> outerScans = plan.getScans();
+ assertEquals(1, outerScans.size());
+ List<Scan> innerScans = outerScans.get(0);
+ assertEquals(1, innerScans.size());
+ Scan scan = innerScans.get(0);
+ assertEquals("A", Bytes.toString(scan.getStartRow()).trim());
+ assertEquals("C", Bytes.toString(scan.getStopRow()).trim());
+ }
+ }
+
+ @Test
+ public void testMultiColLocalIndexPruning() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE T (\n" +
+ " A CHAR(1) NOT NULL,\n" +
+ " B CHAR(1) NOT NULL,\n" +
+ " C CHAR(1) NOT NULL,\n" +
+ " D CHAR(1) NOT NULL,\n" +
+ " CONSTRAINT PK PRIMARY KEY (\n" +
+ " A,\n" +
+ " B,\n" +
+ " C,\n" +
+ " D\n" +
+ " )\n" +
+ ") SPLIT ON ('A','C','E','G','I')");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)");
+ String query = "SELECT * FROM T WHERE A = 'C' and B = 'X' and D='C'";
+ PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class);
+ QueryPlan plan = statement.optimizeQuery(query);
+ assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString());
+ plan.iterator();
+ List<List<Scan>> outerScans = plan.getScans();
+ assertEquals(1, outerScans.size());
+ List<Scan> innerScans = outerScans.get(0);
+ assertEquals(1, innerScans.size());
+ Scan scan = innerScans.get(0);
+ assertEquals("C", Bytes.toString(scan.getStartRow()).trim());
+ assertEquals("E", Bytes.toString(scan.getStopRow()).trim());
+ }
+ }
+
+ @Test
+ public void testSkipScanLocalIndexPruning() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE T (\n" +
+ " A CHAR(1) NOT NULL,\n" +
+ " B CHAR(1) NOT NULL,\n" +
+ " C CHAR(1) NOT NULL,\n" +
+ " D CHAR(1) NOT NULL,\n" +
+ " CONSTRAINT PK PRIMARY KEY (\n" +
+ " A,\n" +
+ " B,\n" +
+ " C,\n" +
+ " D\n" +
+ " )\n" +
+ ") SPLIT ON ('A','C','E','G','I')");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)");
+ String query = "SELECT * FROM T WHERE A IN ('A','G') and B = 'A' and D = 'D'";
+ PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class);
+ QueryPlan plan = statement.optimizeQuery(query);
+ assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString());
+ plan.iterator();
+ List<List<Scan>> outerScans = plan.getScans();
+ assertEquals(2, outerScans.size());
+ List<Scan> innerScans1 = outerScans.get(0);
+ assertEquals(1, innerScans1.size());
+ Scan scan1 = innerScans1.get(0);
+ assertEquals("A", Bytes.toString(scan1.getStartRow()).trim());
+ assertEquals("C", Bytes.toString(scan1.getStopRow()).trim());
+ List<Scan> innerScans2 = outerScans.get(1);
+ assertEquals(1, innerScans2.size());
+ Scan scan2 = innerScans2.get(0);
+ assertEquals("G", Bytes.toString(scan2.getStartRow()).trim());
+ assertEquals("I", Bytes.toString(scan2.getStopRow()).trim());
+ }
+ }
+
+ @Test
+ public void testRVCLocalIndexPruning() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE T (\n" +
+ " A CHAR(1) NOT NULL,\n" +
+ " B CHAR(1) NOT NULL,\n" +
+ " C CHAR(1) NOT NULL,\n" +
+ " D CHAR(1) NOT NULL,\n" +
+ " CONSTRAINT PK PRIMARY KEY (\n" +
+ " A,\n" +
+ " B,\n" +
+ " C,\n" +
+ " D\n" +
+ " )\n" +
+ ") SPLIT ON ('A','C','E','G','I')");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)");
+ String query = "SELECT * FROM T WHERE A='I' and (B,D) IN (('A','D'),('B','I'))";
+ PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class);
+ QueryPlan plan = statement.optimizeQuery(query);
+ assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString());
+ plan.iterator();
+ List<List<Scan>> outerScans = plan.getScans();
+ assertEquals(1, outerScans.size());
+ List<Scan> innerScans = outerScans.get(0);
+ assertEquals(1, innerScans.size());
+ Scan scan = innerScans.get(0);
+ assertEquals("I", Bytes.toString(scan.getStartRow()).trim());
+ assertEquals(0, scan.getStopRow().length);
+ }
+ }
+
+ @Test
+ public void testRVCLocalIndexPruning2() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE T (\n" +
+ " A CHAR(1) NOT NULL,\n" +
+ " B VARCHAR,\n" +
+ " C VARCHAR,\n" +
+ " D VARCHAR,\n" +
+ " E VARCHAR,\n" +
+ " F VARCHAR,\n" +
+ " G VARCHAR,\n" +
+ " CONSTRAINT PK PRIMARY KEY (\n" +
+ " A,\n" +
+ " B,\n" +
+ " C,\n" +
+ " D,\n" +
+ " E,\n" +
+ " F,\n" +
+ " G\n" +
+ " )\n" +
+ ") SPLIT ON ('A','C','E','G','I')");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,C,F,G)");
+ String query = "SELECT * FROM T WHERE (A,B,C,D) IN (('I','D','F','X'),('I','I','G','Y')) and F='X' and G='Y'";
+ PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class);
+ QueryPlan plan = statement.optimizeQuery(query);
+ assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString());
+ plan.iterator();
+ List<List<Scan>> outerScans = plan.getScans();
+ assertEquals(1, outerScans.size());
+ List<Scan> innerScans = outerScans.get(0);
+ assertEquals(1, innerScans.size());
+ Scan scan = innerScans.get(0);
+ assertEquals("I", Bytes.toString(scan.getStartRow()).trim());
+ assertEquals(0, scan.getStopRow().length);
+ }
+ }
+
+ @Test
+ public void testMinMaxRangeLocalIndexPruning() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE T (\n" +
+ " A CHAR(1) NOT NULL,\n" +
+ " B CHAR(1) NOT NULL,\n" +
+ " C CHAR(1) NOT NULL,\n" +
+ " D CHAR(1) NOT NULL,\n" +
+ " CONSTRAINT PK PRIMARY KEY (\n" +
+ " A,\n" +
+ " B,\n" +
+ " C,\n" +
+ " D\n" +
+ " )\n" +
+ ") SPLIT ON ('A','C','E','G','I')");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)");
+ String query = "SELECT * FROM T WHERE A = 'C' and (A,B,D) > ('C','B','X') and D='C'";
+ PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class);
+ QueryPlan plan = statement.optimizeQuery(query);
+ assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString());
+ plan.iterator();
+ List<List<Scan>> outerScans = plan.getScans();
+ assertEquals(1, outerScans.size());
+ List<Scan> innerScans = outerScans.get(0);
+ assertEquals(1, innerScans.size());
+ Scan scan = innerScans.get(0);
+ assertEquals("C", Bytes.toString(scan.getStartRow()).trim());
+ assertEquals("E", Bytes.toString(scan.getStopRow()).trim());
+ }
+ }
+
+ @Test
+ public void testNoLocalIndexPruning() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE T (\n" +
+ " A CHAR(1) NOT NULL,\n" +
+ " B CHAR(1) NOT NULL,\n" +
+ " C CHAR(1) NOT NULL,\n" +
+ " CONSTRAINT PK PRIMARY KEY (\n" +
+ " A,\n" +
+ " B,\n" +
+ " C\n" +
+ " )\n" +
+ ") SPLIT ON ('A','C','E','G','I')");
+ conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(C)");
+ String query = "SELECT * FROM T WHERE C='C'";
+ PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class);
+ QueryPlan plan = statement.optimizeQuery(query);
+ assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString());
+ plan.iterator();
+ List<List<Scan>> outerScans = plan.getScans();
+ assertEquals(6, outerScans.size());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java
new file mode 100644
index 0000000..abc435a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import static org.apache.phoenix.query.KeyRange.UNBOUND;
+import static org.apache.phoenix.query.QueryConstants.DESC_SEPARATOR_BYTE_ARRAY;
+import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY;
+import static org.junit.Assert.assertEquals;
+
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.iterate.BaseResultIterators;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * Test for intersect method in {@link SkipScanFilter}
+ */
+@RunWith(Parameterized.class)
+public class KeyRangeClipTest extends BaseConnectionlessQueryTest {
+ private final RowKeySchema schema;
+ private final KeyRange input;
+ private final KeyRange expectedOutput;
+ private final int clipTo;
+
+ private static byte[] getRange(PhoenixConnection pconn, List<Object> startValues) throws SQLException {
+ byte[] lowerRange;
+ if (startValues == null) {
+ lowerRange = KeyRange.UNBOUND;
+ } else {
+ String upsertValues = StringUtils.repeat("?,", startValues.size()).substring(0,startValues.size() * 2 - 1);
+ String upsertStmt = "UPSERT INTO T VALUES(" + upsertValues + ")";
+ PreparedStatement stmt = pconn.prepareStatement(upsertStmt);
+ for (int i = 0; i < startValues.size(); i++) {
+ stmt.setObject(i+1, startValues.get(i));
+ }
+ stmt.execute();
+ Cell startCell = PhoenixRuntime.getUncommittedDataIterator(pconn).next().getSecond().get(0);
+ lowerRange = CellUtil.cloneRow(startCell);
+ pconn.rollback();
+ }
+ return lowerRange;
+ }
+
+ public KeyRangeClipTest(String tableDef, List<Object> startValues, List<Object> endValues, int clipTo, KeyRange expectedOutput) throws SQLException {
+ PhoenixConnection pconn = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class);
+ pconn.createStatement().execute("CREATE TABLE T(" + tableDef+ ")");
+ PTable table = pconn.getMetaDataCache().getTableRef(new PTableKey(null,"T")).getTable();
+ this.schema = table.getRowKeySchema();
+ byte[] lowerRange = getRange(pconn, startValues);
+ byte[] upperRange = getRange(pconn, endValues);
+ this.input = KeyRange.getKeyRange(lowerRange, upperRange);
+ this.expectedOutput = expectedOutput;
+ this.clipTo = clipTo;
+ }
+
+ @After
+ public void cleanup() throws SQLException {
+ PhoenixConnection pconn = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class);
+ pconn.createStatement().execute("DROP TABLE T");
+ }
+
+ @Test
+ public void test() {
+ ScanRanges scanRanges = ScanRanges.create(schema, Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(input)), new int[] {schema.getFieldCount()-1}, KeyRange.EVERYTHING_RANGE, null, false, -1);
+ ScanRanges clippedRange = BaseResultIterators.computePrefixScanRanges(scanRanges, clipTo);
+ assertEquals(expectedOutput, clippedRange.getScanRange());
+ }
+
+ @Parameters(name="KeyRangeClipTest_{0}")
+ public static Collection<Object> data() {
+ List<Object> testCases = Lists.newArrayList();
+ testCases.add(Lists.newArrayList( // [XY - *]
+ "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C)",
+ Lists.newArrayList("XY",null,"Z"), null, 2,
+ KeyRange.getKeyRange(Bytes.toBytes("XY"), true, UNBOUND, false)).toArray());
+ testCases.add(Lists.newArrayList(
+ "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C)",
+ null, Lists.newArrayList("XY",null,"Z"), 2,
+ KeyRange.getKeyRange(
+ ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips null values for unbound lower
+ ByteUtil.nextKey(ByteUtil.concat(Bytes.toBytes("XY"),SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY)), false)).toArray());
+ testCases.add(Lists.newArrayList(
+ "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, D VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C,D)",
+ Lists.newArrayList("XY",null,null,"Z"), null, 3,
+ KeyRange.getKeyRange(Bytes.toBytes("XY"), true, UNBOUND, false)).toArray());
+ testCases.add(Lists.newArrayList(
+ "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, D VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C,D)",
+ null, Lists.newArrayList("XY",null,null,"Z"), 3,
+ KeyRange.getKeyRange(
+ ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips null values for unbound lower
+ ByteUtil.nextKey(ByteUtil.concat(Bytes.toBytes("XY"),SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY)), false)).toArray());
+ testCases.add(Lists.newArrayList(
+ "A CHAR(1) NOT NULL, B CHAR(1) NOT NULL, C CHAR(1) NOT NULL, CONSTRAINT PK PRIMARY KEY (A,B,C)",
+ Lists.newArrayList("A","B","C"), Lists.newArrayList("C","D","E"), 2,
+ KeyRange.getKeyRange(Bytes.toBytes("AB"), true, ByteUtil.nextKey(Bytes.toBytes("CD")), false)).toArray());
+ testCases.add(Lists.newArrayList(
+ "A VARCHAR NOT NULL, B VARCHAR, C SMALLINT NOT NULL, D VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C,D)",
+ Lists.<Object>newArrayList("XY",null,1,"Z"), null, 3,
+ KeyRange.getKeyRange(ByteUtil.concat(Bytes.toBytes("XY"), SEPARATOR_BYTE_ARRAY, SEPARATOR_BYTE_ARRAY, PSmallint.INSTANCE.toBytes(1)), true, UNBOUND, false)).toArray());
+ testCases.add(Lists.newArrayList(
+ "A VARCHAR NOT NULL, B BIGINT NOT NULL, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B DESC,C)",
+ Lists.<Object>newArrayList("XYZ",1,"Z"), null, 2,
+ KeyRange.getKeyRange(ByteUtil.concat(Bytes.toBytes("XYZ"), SEPARATOR_BYTE_ARRAY, PLong.INSTANCE.toBytes(1, SortOrder.DESC)), true, UNBOUND, false)).toArray());
+ testCases.add(Lists.newArrayList(
+ "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A DESC,B,C)",
+ null, Lists.newArrayList("XY",null,"Z"), 3,
+ KeyRange.getKeyRange(
+ ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips null values for unbound lower
+ (ByteUtil.concat(PVarchar.INSTANCE.toBytes("XY",SortOrder.DESC),DESC_SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,Bytes.toBytes("Z"))), false)).toArray());
+ return testCases;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ba8d7b6/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 0f12d9c..1903dda 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -493,7 +493,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
return Cost.ZERO;
}
- }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null);
+ }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null, null);
List<KeyRange> keyRanges = parallelIterators.getSplits();
return keyRanges;
}