You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2017/11/13 21:08:55 UTC
phoenix git commit: PHOENIX-4342 - Surface QueryPlan in MutationPlan
Repository: phoenix
Updated Branches:
refs/heads/master 2a8e1c750 -> 1d8a6bc3a
PHOENIX-4342 - Surface QueryPlan in MutationPlan
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1d8a6bc3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1d8a6bc3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1d8a6bc3
Branch: refs/heads/master
Commit: 1d8a6bc3a6a277d9e3201066b753fa9fd7018545
Parents: 2a8e1c7
Author: Geoffrey Jacoby <gj...@apache.org>
Authored: Thu Nov 2 13:41:02 2017 -0700
Committer: Geoffrey Jacoby <gj...@apache.org>
Committed: Mon Nov 13 11:47:50 2017 -0800
----------------------------------------------------------------------
.../phoenix/compile/BaseMutationPlan.java | 5 +
.../phoenix/compile/DelegateMutationPlan.java | 5 +
.../apache/phoenix/compile/DeleteCompiler.java | 545 ++++++++-------
.../apache/phoenix/compile/MutationPlan.java | 5 +-
.../apache/phoenix/compile/UpsertCompiler.java | 675 +++++++++++--------
.../apache/phoenix/jdbc/PhoenixStatement.java | 9 +-
6 files changed, 733 insertions(+), 511 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
index 0e45682..60eb59a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
@@ -79,4 +79,9 @@ public abstract class BaseMutationPlan implements MutationPlan {
return 0l;
}
+ @Override
+ public QueryPlan getQueryPlan() {
+ return null;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
index 343ec32..90eef61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
@@ -42,6 +42,11 @@ public class DelegateMutationPlan implements MutationPlan {
}
@Override
+ public QueryPlan getQueryPlan() {
+ return plan.getQueryPlan();
+ }
+
+ @Override
public ParameterMetaData getParameterMetaData() {
return plan.getParameterMetaData();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f038cda..8d9a5b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -303,14 +303,16 @@ public class DeleteCompiler {
return Collections.emptyList();
}
- private class MultiDeleteMutationPlan implements MutationPlan {
+ private class MultiRowDeleteMutationPlan implements MutationPlan {
private final List<MutationPlan> plans;
private final MutationPlan firstPlan;
-
- public MultiDeleteMutationPlan(@NotNull List<MutationPlan> plans) {
+ private final QueryPlan dataPlan;
+
+ public MultiRowDeleteMutationPlan(QueryPlan dataPlan, @NotNull List<MutationPlan> plans) {
Preconditions.checkArgument(!plans.isEmpty());
this.plans = plans;
this.firstPlan = plans.get(0);
+ this.dataPlan = dataPlan;
}
@Override
@@ -348,8 +350,8 @@ public class DeleteCompiler {
return firstPlan.getSourceRefs();
}
- @Override
- public Operation getOperation() {
+ @Override
+ public Operation getOperation() {
return operation;
}
@@ -401,6 +403,11 @@ public class DeleteCompiler {
}
return estInfoTimestamp;
}
+
+ @Override
+ public QueryPlan getQueryPlan() {
+ return dataPlan;
+ }
}
public MutationPlan compile(DeleteStatement delete) throws SQLException {
@@ -548,69 +555,9 @@ public class DeleteCompiler {
List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(queryPlans.size());
for (final QueryPlan plan : queryPlans) {
final StatementContext context = plan.getContext();
- mutationPlans.add(new MutationPlan() {
-
- @Override
- public ParameterMetaData getParameterMetaData() {
- return context.getBindManager().getParameterMetaData();
- }
-
- @Override
- public MutationState execute() throws SQLException {
- // We have a point lookup, so we know we have a simple set of fully qualified
- // keys for our ranges
- ScanRanges ranges = context.getScanRanges();
- Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
- Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
- while (iterator.hasNext()) {
- mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
- }
- return new MutationState(plan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection);
- }
-
- @Override
- public ExplainPlan getExplainPlan() throws SQLException {
- return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
- }
-
- @Override
- public StatementContext getContext() {
- return context;
- }
-
- @Override
- public TableRef getTargetRef() {
- return dataPlan.getTableRef();
- }
-
- @Override
- public Set<TableRef> getSourceRefs() {
- // Don't include the target
- return Collections.emptySet();
- }
-
- @Override
- public Operation getOperation() {
- return operation;
- }
-
- @Override
- public Long getEstimatedRowsToScan() throws SQLException {
- return 0l;
- }
-
- @Override
- public Long getEstimatedBytesToScan() throws SQLException {
- return 0l;
- }
-
- @Override
- public Long getEstimateInfoTimestamp() throws SQLException {
- return 0l;
- }
- });
+ mutationPlans.add(new SingleRowDeleteMutationPlan(plan, connection, maxSize, maxSizeBytes));
}
- return new MultiDeleteMutationPlan(mutationPlans);
+ return new MultiRowDeleteMutationPlan(dataPlan, mutationPlans);
} else if (runOnServer) {
// TODO: better abstraction
final StatementContext context = dataPlan.getContext();
@@ -629,91 +576,7 @@ public class DeleteCompiler {
final RowProjector projector = projectorToBe;
final QueryPlan aggPlan = new AggregatePlan(context, select, dataPlan.getTableRef(), projector, null, null,
OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
- return new MutationPlan() {
- @Override
- public ParameterMetaData getParameterMetaData() {
- return context.getBindManager().getParameterMetaData();
- }
-
- @Override
- public StatementContext getContext() {
- return context;
- }
-
- @Override
- public TableRef getTargetRef() {
- return dataPlan.getTableRef();
- }
-
- @Override
- public Set<TableRef> getSourceRefs() {
- return dataPlan.getSourceRefs();
- }
-
- @Override
- public Operation getOperation() {
- return operation;
- }
-
- @Override
- public MutationState execute() throws SQLException {
- // TODO: share this block of code with UPSERT SELECT
- ImmutableBytesWritable ptr = context.getTempPtr();
- PTable table = dataPlan.getTableRef().getTable();
- table.getIndexMaintainers(ptr, context.getConnection());
- byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
- ServerCache cache = null;
- try {
- if (ptr.getLength() > 0) {
- byte[] uuidValue = ServerCacheClient.generateId();
- context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
- context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
- }
- ResultIterator iterator = aggPlan.iterator();
- try {
- Tuple row = iterator.next();
- final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
- return new MutationState(maxSize, maxSizeBytes, connection) {
- @Override
- public long getUpdateCount() {
- return mutationCount;
- }
- };
- } finally {
- iterator.close();
- }
- } finally {
- if (cache != null) {
- cache.close();
- }
- }
- }
-
- @Override
- public ExplainPlan getExplainPlan() throws SQLException {
- List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps();
- List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
- planSteps.add("DELETE ROWS");
- planSteps.addAll(queryPlanSteps);
- return new ExplainPlan(planSteps);
- }
-
- @Override
- public Long getEstimatedRowsToScan() throws SQLException {
- return aggPlan.getEstimatedRowsToScan();
- }
-
- @Override
- public Long getEstimatedBytesToScan() throws SQLException {
- return aggPlan.getEstimatedBytesToScan();
- }
-
- @Override
- public Long getEstimateInfoTimestamp() throws SQLException {
- return aggPlan.getEstimateInfoTimestamp();
- }
- };
+ return new ServerSelectDeleteMutationPlan(dataPlan, connection, aggPlan, projector, maxSize, maxSizeBytes);
} else {
final DeletingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
List<PColumn> adjustedProjectedColumns = Lists.newArrayListWithExpectedSize(projectedColumns.size());
@@ -749,90 +612,322 @@ public class DeleteCompiler {
if (!bestPlan.getTableRef().getTable().equals(targetTableRef.getTable())) {
otherTableRefs.add(projectedTableRef);
}
- final StatementContext context = bestPlan.getContext();
- return new MutationPlan() {
- @Override
- public ParameterMetaData getParameterMetaData() {
- return context.getBindManager().getParameterMetaData();
- }
+ return new ClientSelectDeleteMutationPlan(targetTableRef, dataPlan, bestPlan, hasPreOrPostProcessing,
+ parallelIteratorFactory, otherTableRefs, projectedTableRef, maxSize, maxSizeBytes, connection);
+ }
+ }
- @Override
- public StatementContext getContext() {
- return context;
- }
+ private class SingleRowDeleteMutationPlan implements MutationPlan {
- @Override
- public TableRef getTargetRef() {
- return targetTableRef;
- }
+ private final QueryPlan dataPlan;
+ private final PhoenixConnection connection;
+ private final int maxSize;
+ private final StatementContext context;
+ private final int maxSizeBytes;
- @Override
- public Set<TableRef> getSourceRefs() {
- return dataPlan.getSourceRefs();
- }
+ public SingleRowDeleteMutationPlan(QueryPlan dataPlan, PhoenixConnection connection, int maxSize, int maxSizeBytes) {
+ this.dataPlan = dataPlan;
+ this.connection = connection;
+ this.maxSize = maxSize;
+ this.context = dataPlan.getContext();
+ this.maxSizeBytes = maxSizeBytes;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
- @Override
- public Operation getOperation() {
- return operation;
- }
-
- @Override
- public MutationState execute() throws SQLException {
- ResultIterator iterator = bestPlan.iterator();
- try {
- if (!hasPreOrPostProcessing) {
- Tuple tuple;
- long totalRowCount = 0;
- if (parallelIteratorFactory != null) {
- parallelIteratorFactory.setQueryPlan(bestPlan);
- parallelIteratorFactory.setOtherTableRefs(otherTableRefs);
- parallelIteratorFactory.setProjectedTableRef(projectedTableRef);
- }
- while ((tuple=iterator.next()) != null) {// Runs query
- Cell kv = tuple.getValue(0);
- totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
- }
- // Return total number of rows that have been deleted from the table. In the case of auto commit being off
- // the mutations will all be in the mutation state of the current connection. We need to divide by the
- // total number of tables we updated as otherwise the client will get an unexpected result
- MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount / ((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && !otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size()));
-
- // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
- state.setReadMetricQueue(context.getReadMetricsQueue());
-
- return state;
- } else {
- return deleteRows(context, iterator, bestPlan, projectedTableRef, otherTableRefs);
+ @Override
+ public MutationState execute() throws SQLException {
+ // We have a point lookup, so we know we have a simple set of fully qualified
+ // keys for our ranges
+ ScanRanges ranges = context.getScanRanges();
+ Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
+ Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
+ while (iterator.hasNext()) {
+ mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()),
+ new RowMutationState(PRow.DELETE_MARKER,
+ statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
+ }
+ return new MutationState(dataPlan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
+ }
+
+ @Override
+ public QueryPlan getQueryPlan() {
+ return dataPlan;
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
+
+ @Override
+ public TableRef getTargetRef() {
+ return dataPlan.getTableRef();
+ }
+
+ @Override
+ public Set<TableRef> getSourceRefs() {
+ // Don't include the target
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Operation getOperation() {
+ return operation;
+ }
+
+ @Override
+ public Long getEstimatedRowsToScan() throws SQLException {
+ return 0l;
+ }
+
+ @Override
+ public Long getEstimatedBytesToScan() throws SQLException {
+ return 0l;
+ }
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return 0l;
+ }
+ }
+
+ private class ServerSelectDeleteMutationPlan implements MutationPlan {
+ private final StatementContext context;
+ private final QueryPlan dataPlan;
+ private final PhoenixConnection connection;
+ private final QueryPlan aggPlan;
+ private final RowProjector projector;
+ private final int maxSize;
+ private final int maxSizeBytes;
+
+ public ServerSelectDeleteMutationPlan(QueryPlan dataPlan, PhoenixConnection connection, QueryPlan aggPlan,
+ RowProjector projector, int maxSize, int maxSizeBytes) {
+ this.context = dataPlan.getContext();
+ this.dataPlan = dataPlan;
+ this.connection = connection;
+ this.aggPlan = aggPlan;
+ this.projector = projector;
+ this.maxSize = maxSize;
+ this.maxSizeBytes = maxSizeBytes;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
+
+ @Override
+ public TableRef getTargetRef() {
+ return dataPlan.getTableRef();
+ }
+
+ @Override
+ public Set<TableRef> getSourceRefs() {
+ return dataPlan.getSourceRefs();
+ }
+
+ @Override
+ public Operation getOperation() {
+ return operation;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ // TODO: share this block of code with UPSERT SELECT
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ PTable table = dataPlan.getTableRef().getTable();
+ table.getIndexMaintainers(ptr, context.getConnection());
+ byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
+ ServerCache cache = null;
+ try {
+ if (ptr.getLength() > 0) {
+ byte[] uuidValue = ServerCacheClient.generateId();
+ context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+ context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
+ ResultIterator iterator = aggPlan.iterator();
+ try {
+ Tuple row = iterator.next();
+ final long mutationCount = (Long) projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+ return new MutationState(maxSize, maxSizeBytes, connection) {
+ @Override
+ public long getUpdateCount() {
+ return mutationCount;
}
- } finally {
- iterator.close();
- }
+ };
+ } finally {
+ iterator.close();
}
-
- @Override
- public ExplainPlan getExplainPlan() throws SQLException {
- List<String> queryPlanSteps = bestPlan.getExplainPlan().getPlanSteps();
- List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
- planSteps.add("DELETE ROWS");
- planSteps.addAll(queryPlanSteps);
- return new ExplainPlan(planSteps);
+ } finally {
+ if (cache != null) {
+ cache.close();
}
+ }
+ }
- @Override
- public Long getEstimatedRowsToScan() throws SQLException {
- return bestPlan.getEstimatedRowsToScan();
- }
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("DELETE ROWS");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
- @Override
- public Long getEstimatedBytesToScan() throws SQLException {
- return bestPlan.getEstimatedBytesToScan();
- }
+ @Override
+ public Long getEstimatedRowsToScan() throws SQLException {
+ return aggPlan.getEstimatedRowsToScan();
+ }
- @Override
- public Long getEstimateInfoTimestamp() throws SQLException {
- return bestPlan.getEstimateInfoTimestamp();
+ @Override
+ public Long getEstimatedBytesToScan() throws SQLException {
+ return aggPlan.getEstimatedBytesToScan();
+ }
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return aggPlan.getEstimateInfoTimestamp();
+ }
+
+ @Override
+ public QueryPlan getQueryPlan() {
+ return aggPlan;
+ }
+ }
+
+ private class ClientSelectDeleteMutationPlan implements MutationPlan {
+ private final StatementContext context;
+ private final TableRef targetTableRef;
+ private final QueryPlan dataPlan;
+ private final QueryPlan bestPlan;
+ private final boolean hasPreOrPostProcessing;
+ private final DeletingParallelIteratorFactory parallelIteratorFactory;
+ private final List<TableRef> otherTableRefs;
+ private final TableRef projectedTableRef;
+ private final int maxSize;
+ private final int maxSizeBytes;
+ private final PhoenixConnection connection;
+
+ public ClientSelectDeleteMutationPlan(TableRef targetTableRef, QueryPlan dataPlan, QueryPlan bestPlan,
+ boolean hasPreOrPostProcessing,
+ DeletingParallelIteratorFactory parallelIteratorFactory,
+ List<TableRef> otherTableRefs, TableRef projectedTableRef, int maxSize,
+ int maxSizeBytes, PhoenixConnection connection) {
+ this.context = bestPlan.getContext();
+ this.targetTableRef = targetTableRef;
+ this.dataPlan = dataPlan;
+ this.bestPlan = bestPlan;
+ this.hasPreOrPostProcessing = hasPreOrPostProcessing;
+ this.parallelIteratorFactory = parallelIteratorFactory;
+ this.otherTableRefs = otherTableRefs;
+ this.projectedTableRef = projectedTableRef;
+ this.maxSize = maxSize;
+ this.maxSizeBytes = maxSizeBytes;
+ this.connection = connection;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
+
+ @Override
+ public TableRef getTargetRef() {
+ return targetTableRef;
+ }
+
+ @Override
+ public Set<TableRef> getSourceRefs() {
+ return dataPlan.getSourceRefs();
+ }
+
+ @Override
+ public Operation getOperation() {
+ return operation;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ ResultIterator iterator = bestPlan.iterator();
+ try {
+ if (!hasPreOrPostProcessing) {
+ Tuple tuple;
+ long totalRowCount = 0;
+ if (parallelIteratorFactory != null) {
+ parallelIteratorFactory.setQueryPlan(bestPlan);
+ parallelIteratorFactory.setOtherTableRefs(otherTableRefs);
+ parallelIteratorFactory.setProjectedTableRef(projectedTableRef);
+ }
+ while ((tuple=iterator.next()) != null) {// Runs query
+ Cell kv = tuple.getValue(0);
+ totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
+ }
+ // Return total number of rows that have been deleted from the table. In the case of auto commit being off
+ // the mutations will all be in the mutation state of the current connection. We need to divide by the
+ // total number of tables we updated as otherwise the client will get an unexpected result
+ MutationState state = new MutationState(maxSize, maxSizeBytes, connection,
+ totalRowCount /
+ ((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && !otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size()));
+
+ // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
+ state.setReadMetricQueue(context.getReadMetricsQueue());
+
+ return state;
+ } else {
+ return deleteRows(context, iterator, bestPlan, projectedTableRef, otherTableRefs);
}
- };
+ } finally {
+ iterator.close();
+ }
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = bestPlan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("DELETE ROWS");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+
+ @Override
+ public Long getEstimatedRowsToScan() throws SQLException {
+ return bestPlan.getEstimatedRowsToScan();
+ }
+
+ @Override
+ public Long getEstimatedBytesToScan() throws SQLException {
+ return bestPlan.getEstimatedBytesToScan();
+ }
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return bestPlan.getEstimateInfoTimestamp();
+ }
+
+ @Override
+ public QueryPlan getQueryPlan() {
+ return bestPlan;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
index ddc2004..97f3f3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
@@ -24,6 +24,7 @@ import org.apache.phoenix.schema.TableRef;
public interface MutationPlan extends StatementPlan {
- public MutationState execute() throws SQLException;
- public TableRef getTargetRef();
+ MutationState execute() throws SQLException;
+ TableRef getTargetRef();
+ QueryPlan getQueryPlan();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 6445894..3603ce7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -694,173 +694,13 @@ public class UpsertCompiler {
// Ignore order by - it has no impact
final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
- return new MutationPlan() {
- @Override
- public ParameterMetaData getParameterMetaData() {
- return queryPlan.getContext().getBindManager().getParameterMetaData();
- }
-
- @Override
- public StatementContext getContext() {
- return queryPlan.getContext();
- }
-
- @Override
- public TableRef getTargetRef() {
- return tableRef;
- }
-
- @Override
- public Set<TableRef> getSourceRefs() {
- return originalQueryPlan.getSourceRefs();
- }
-
- @Override
- public Operation getOperation() {
- return operation;
- }
-
- @Override
- public MutationState execute() throws SQLException {
- ImmutableBytesWritable ptr = context.getTempPtr();
- PTable table = tableRef.getTable();
- table.getIndexMaintainers(ptr, context.getConnection());
- byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
-
- if (ptr.getLength() > 0) {
- byte[] uuidValue = ServerCacheClient.generateId();
- scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
- scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
- }
- ResultIterator iterator = aggPlan.iterator();
- try {
- Tuple row = iterator.next();
- final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row,
- PLong.INSTANCE, ptr);
- return new MutationState(maxSize, maxSizeBytes, connection) {
- @Override
- public long getUpdateCount() {
- return mutationCount;
- }
- };
- } finally {
- iterator.close();
- }
-
- }
-
- @Override
- public ExplainPlan getExplainPlan() throws SQLException {
- List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps();
- List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
- planSteps.add("UPSERT ROWS");
- planSteps.addAll(queryPlanSteps);
- return new ExplainPlan(planSteps);
- }
-
- @Override
- public Long getEstimatedRowsToScan() throws SQLException {
- return aggPlan.getEstimatedRowsToScan();
- }
-
- @Override
- public Long getEstimatedBytesToScan() throws SQLException {
- return aggPlan.getEstimatedBytesToScan();
- }
-
- @Override
- public Long getEstimateInfoTimestamp() throws SQLException {
- return aggPlan.getEstimateInfoTimestamp();
- }
- };
+ return new ServerUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, maxSize, maxSizeBytes);
}
}
////////////////////////////////////////////////////////////////////
// UPSERT SELECT run client-side
/////////////////////////////////////////////////////////////////////
- return new MutationPlan() {
- @Override
- public ParameterMetaData getParameterMetaData() {
- return queryPlan.getContext().getBindManager().getParameterMetaData();
- }
-
- @Override
- public StatementContext getContext() {
- return queryPlan.getContext();
- }
-
- @Override
- public TableRef getTargetRef() {
- return tableRef;
- }
-
- @Override
- public Set<TableRef> getSourceRefs() {
- return originalQueryPlan.getSourceRefs();
- }
-
- @Override
- public Operation getOperation() {
- return operation;
- }
-
- @Override
- public MutationState execute() throws SQLException {
- ResultIterator iterator = queryPlan.iterator();
- if (parallelIteratorFactory == null) {
- return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
- }
- try {
- parallelIteratorFactory.setRowProjector(projector);
- parallelIteratorFactory.setColumnIndexes(columnIndexes);
- parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
- Tuple tuple;
- long totalRowCount = 0;
- StatementContext context = queryPlan.getContext();
- while ((tuple=iterator.next()) != null) {// Runs query
- Cell kv = tuple.getValue(0);
- totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
- }
- // Return total number of rows that have been updated. In the case of auto commit being off
- // the mutations will all be in the mutation state of the current connection.
- MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount);
- /*
- * All the metrics collected for measuring the reads done by the parallel mutating iterators
- * is included in the ReadMetricHolder of the statement context. Include these metrics in the
- * returned mutation state so they can be published on commit.
- */
- mutationState.setReadMetricQueue(context.getReadMetricsQueue());
- return mutationState;
- } finally {
- iterator.close();
- }
- }
-
- @Override
- public ExplainPlan getExplainPlan() throws SQLException {
- List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps();
- List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
- planSteps.add("UPSERT SELECT");
- planSteps.addAll(queryPlanSteps);
- return new ExplainPlan(planSteps);
- }
-
- @Override
- public Long getEstimatedRowsToScan() throws SQLException {
- return queryPlan.getEstimatedRowsToScan();
- }
-
- @Override
- public Long getEstimatedBytesToScan() throws SQLException {
- return queryPlan.getEstimatedBytesToScan();
- }
-
- @Override
- public Long getEstimateInfoTimestamp() throws SQLException {
- return queryPlan.getEstimateInfoTimestamp();
- }
- };
+ return new ClientUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, parallelIteratorFactory, projector, columnIndexes, pkSlotIndexes, useServerTimestamp, maxSize, maxSizeBytes);
}
@@ -986,124 +826,9 @@ public class UpsertCompiler {
}
final byte[] onDupKeyBytes = onDupKeyBytesToBe;
- return new MutationPlan() {
- @Override
- public ParameterMetaData getParameterMetaData() {
- return context.getBindManager().getParameterMetaData();
- }
-
- @Override
- public StatementContext getContext() {
- return context;
- }
-
- @Override
- public TableRef getTargetRef() {
- return tableRef;
- }
-
- @Override
- public Set<TableRef> getSourceRefs() {
- return Collections.emptySet();
- }
-
- @Override
- public Operation getOperation() {
- return operation;
- }
-
- @Override
- public MutationState execute() throws SQLException {
- ImmutableBytesWritable ptr = context.getTempPtr();
- final SequenceManager sequenceManager = context.getSequenceManager();
- // Next evaluate all the expressions
- int nodeIndex = nodeIndexOffset;
- PTable table = tableRef.getTable();
- Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
- sequenceManager.newSequenceTuple(null);
- for (Expression constantExpression : constantExpressions) {
- PColumn column = allColumns.get(columnIndexes[nodeIndex]);
- constantExpression.evaluate(tuple, ptr);
- Object value = null;
- if (constantExpression.getDataType() != null) {
- value = constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(), constantExpression.getMaxLength(), constantExpression.getScale());
- if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
- throw TypeMismatchException.newException(
- constantExpression.getDataType(), column.getDataType(), "expression: "
- + constantExpression.toString() + " in column " + column);
- }
- if (!column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(),
- constantExpression.getSortOrder(), constantExpression.getMaxLength(),
- constantExpression.getScale(), column.getMaxLength(), column.getScale())) {
- throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
- .setMessage("value=" + constantExpression.toString()).build().buildException();
- }
- }
- column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(),
- constantExpression.getMaxLength(), constantExpression.getScale(), constantExpression.getSortOrder(),
- column.getMaxLength(), column.getScale(),column.getSortOrder(),
- table.rowKeyOrderOptimizable());
- if (overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
- throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
- .setColumnName(column.getName().getString())
- .setMessage("value=" + constantExpression.toString()).build().buildException();
- }
- values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
- nodeIndex++;
- }
- // Add columns based on view
- for (PColumn column : addViewColumns) {
- if (IndexUtil.getViewConstantValue(column, ptr)) {
- values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
- } else {
- throw new IllegalStateException();
- }
- }
- Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
- IndexMaintainer indexMaintainer = null;
- byte[][] viewConstants = null;
- if (table.getIndexType() == IndexType.LOCAL) {
- PTable parentTable =
- statement
- .getConnection()
- .getMetaDataCache()
- .getTableRef(
- new PTableKey(statement.getConnection().getTenantId(),
- table.getParentName().getString())).getTable();
- indexMaintainer = table.getIndexMaintainer(parentTable, connection);
- viewConstants = IndexUtil.getViewConstants(parentTable);
- }
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0);
- return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
- }
-
- @Override
- public ExplainPlan getExplainPlan() throws SQLException {
- List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
- if (context.getSequenceManager().getSequenceCount() > 0) {
- planSteps.add("CLIENT RESERVE " + context.getSequenceManager().getSequenceCount() + " SEQUENCES");
- }
- planSteps.add("PUT SINGLE ROW");
- return new ExplainPlan(planSteps);
- }
-
- @Override
- public Long getEstimatedRowsToScan() throws SQLException {
- return 0l;
- }
-
- @Override
- public Long getEstimatedBytesToScan() throws SQLException {
- return 0l;
- }
-
- @Override
- public Long getEstimateInfoTimestamp() throws SQLException {
- return 0l;
- }
- };
+ return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset, constantExpressions,
+ allColumns, columnIndexes, overlapViewColumns, values, addViewColumns,
+ connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, maxSize, maxSizeBytes);
}
private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable table) {
@@ -1214,4 +939,394 @@ public class UpsertCompiler {
}
}
}
+
+ private class ServerUpsertSelectMutationPlan implements MutationPlan {
+ private final QueryPlan queryPlan;
+ private final TableRef tableRef;
+ private final QueryPlan originalQueryPlan;
+ private final StatementContext context;
+ private final PhoenixConnection connection;
+ private final Scan scan;
+ private final QueryPlan aggPlan;
+ private final RowProjector aggProjector;
+ private final int maxSize;
+ private final int maxSizeBytes;
+
+ public ServerUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef tableRef, QueryPlan originalQueryPlan,
+ StatementContext context, PhoenixConnection connection,
+ Scan scan, QueryPlan aggPlan, RowProjector aggProjector,
+ int maxSize, int maxSizeBytes) {
+ this.queryPlan = queryPlan;
+ this.tableRef = tableRef;
+ this.originalQueryPlan = originalQueryPlan;
+ this.context = context;
+ this.connection = connection;
+ this.scan = scan;
+ this.aggPlan = aggPlan;
+ this.aggProjector = aggProjector;
+ this.maxSize = maxSize;
+ this.maxSizeBytes = maxSizeBytes;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return queryPlan.getContext().getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return queryPlan.getContext();
+ }
+
+ @Override
+ public TableRef getTargetRef() {
+ return tableRef;
+ }
+
+ @Override
+ public QueryPlan getQueryPlan() {
+ return aggPlan;
+ }
+
+ @Override
+ public Set<TableRef> getSourceRefs() {
+ return originalQueryPlan.getSourceRefs();
+ }
+
+ @Override
+ public Operation getOperation() {
+ return operation;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ PTable table = tableRef.getTable();
+ table.getIndexMaintainers(ptr, context.getConnection());
+ byte[] txState = table.isTransactional() ?
+ connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
+
+ if (ptr.getLength() > 0) {
+ byte[] uuidValue = ServerCacheClient.generateId();
+ scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+ scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+ }
+ ResultIterator iterator = aggPlan.iterator();
+ try {
+ Tuple row = iterator.next();
+ final long mutationCount = (Long) aggProjector.getColumnProjector(0).getValue(row,
+ PLong.INSTANCE, ptr);
+ return new MutationState(maxSize, maxSizeBytes, connection) {
+ @Override
+ public long getUpdateCount() {
+ return mutationCount;
+ }
+ };
+ } finally {
+ iterator.close();
+ }
+
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("UPSERT ROWS");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+
+ @Override
+ public Long getEstimatedRowsToScan() throws SQLException {
+ return aggPlan.getEstimatedRowsToScan();
+ }
+
+ @Override
+ public Long getEstimatedBytesToScan() throws SQLException {
+ return aggPlan.getEstimatedBytesToScan();
+ }
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return aggPlan.getEstimateInfoTimestamp();
+ }
+ }
+
+ private class UpsertValuesMutationPlan implements MutationPlan {
+ private final StatementContext context;
+ private final TableRef tableRef;
+ private final int nodeIndexOffset;
+ private final List<Expression> constantExpressions;
+ private final List<PColumn> allColumns;
+ private final int[] columnIndexes;
+ private final Set<PColumn> overlapViewColumns;
+ private final byte[][] values;
+ private final Set<PColumn> addViewColumns;
+ private final PhoenixConnection connection;
+ private final int[] pkSlotIndexes;
+ private final boolean useServerTimestamp;
+ private final byte[] onDupKeyBytes;
+ private final int maxSize;
+ private final int maxSizeBytes;
+
+ public UpsertValuesMutationPlan(StatementContext context, TableRef tableRef, int nodeIndexOffset,
+ List<Expression> constantExpressions, List<PColumn> allColumns,
+ int[] columnIndexes, Set<PColumn> overlapViewColumns, byte[][] values,
+ Set<PColumn> addViewColumns, PhoenixConnection connection,
+ int[] pkSlotIndexes, boolean useServerTimestamp, byte[] onDupKeyBytes,
+ int maxSize, int maxSizeBytes) {
+ this.context = context;
+ this.tableRef = tableRef;
+ this.nodeIndexOffset = nodeIndexOffset;
+ this.constantExpressions = constantExpressions;
+ this.allColumns = allColumns;
+ this.columnIndexes = columnIndexes;
+ this.overlapViewColumns = overlapViewColumns;
+ this.values = values;
+ this.addViewColumns = addViewColumns;
+ this.connection = connection;
+ this.pkSlotIndexes = pkSlotIndexes;
+ this.useServerTimestamp = useServerTimestamp;
+ this.onDupKeyBytes = onDupKeyBytes;
+ this.maxSize = maxSize;
+ this.maxSizeBytes = maxSizeBytes;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
+
+ @Override
+ public TableRef getTargetRef() {
+ return tableRef;
+ }
+
+ @Override
+ public QueryPlan getQueryPlan() {
+ return null;
+ }
+
+ @Override
+ public Set<TableRef> getSourceRefs() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Operation getOperation() {
+ return operation;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ final SequenceManager sequenceManager = context.getSequenceManager();
+ // Next evaluate all the expressions
+ int nodeIndex = nodeIndexOffset;
+ PTable table = tableRef.getTable();
+ Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
+ sequenceManager.newSequenceTuple(null);
+ for (Expression constantExpression : constantExpressions) {
+ PColumn column = allColumns.get(columnIndexes[nodeIndex]);
+ constantExpression.evaluate(tuple, ptr);
+ Object value = null;
+ if (constantExpression.getDataType() != null) {
+ value = constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(),
+ constantExpression.getMaxLength(), constantExpression.getScale());
+ if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
+ throw TypeMismatchException.newException(
+ constantExpression.getDataType(), column.getDataType(), "expression: "
+ + constantExpression.toString() + " in column " + column);
+ }
+ if (!column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(),
+ constantExpression.getSortOrder(), constantExpression.getMaxLength(),
+ constantExpression.getScale(), column.getMaxLength(), column.getScale())) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+ .setMessage("value=" + constantExpression.toString()).build().buildException();
+ }
+ }
+ column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(),
+ constantExpression.getMaxLength(), constantExpression.getScale(), constantExpression.getSortOrder(),
+ column.getMaxLength(), column.getScale(),column.getSortOrder(),
+ table.rowKeyOrderOptimizable());
+ if (overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
+ .setColumnName(column.getName().getString())
+ .setMessage("value=" + constantExpression.toString()).build().buildException();
+ }
+ values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ nodeIndex++;
+ }
+ // Add columns based on view
+ for (PColumn column : addViewColumns) {
+ if (IndexUtil.getViewConstantValue(column, ptr)) {
+ values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+ Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
+ IndexMaintainer indexMaintainer = null;
+ byte[][] viewConstants = null;
+ if (table.getIndexType() == IndexType.LOCAL) {
+ PTable parentTable =
+ statement
+ .getConnection()
+ .getMetaDataCache()
+ .getTableRef(
+ new PTableKey(statement.getConnection().getTenantId(),
+ table.getParentName().getString())).getTable();
+ indexMaintainer = table.getIndexMaintainer(parentTable, connection);
+ viewConstants = IndexUtil.getViewConstants(parentTable);
+ }
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0);
+ return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
+ if (context.getSequenceManager().getSequenceCount() > 0) {
+ planSteps.add("CLIENT RESERVE " + context.getSequenceManager().getSequenceCount() + " SEQUENCES");
+ }
+ planSteps.add("PUT SINGLE ROW");
+ return new ExplainPlan(planSteps);
+ }
+
+ @Override
+ public Long getEstimatedRowsToScan() throws SQLException {
+ return 0l;
+ }
+
+ @Override
+ public Long getEstimatedBytesToScan() throws SQLException {
+ return 0l;
+ }
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return 0l;
+ }
+ }
+
+ private class ClientUpsertSelectMutationPlan implements MutationPlan {
+ private final QueryPlan queryPlan;
+ private final TableRef tableRef;
+ private final QueryPlan originalQueryPlan;
+ private final UpsertingParallelIteratorFactory parallelIteratorFactory;
+ private final RowProjector projector;
+ private final int[] columnIndexes;
+ private final int[] pkSlotIndexes;
+ private final boolean useServerTimestamp;
+ private final int maxSize;
+ private final int maxSizeBytes;
+
+ public ClientUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef tableRef, QueryPlan originalQueryPlan, UpsertingParallelIteratorFactory parallelIteratorFactory, RowProjector projector, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, int maxSize, int maxSizeBytes) {
+ this.queryPlan = queryPlan;
+ this.tableRef = tableRef;
+ this.originalQueryPlan = originalQueryPlan;
+ this.parallelIteratorFactory = parallelIteratorFactory;
+ this.projector = projector;
+ this.columnIndexes = columnIndexes;
+ this.pkSlotIndexes = pkSlotIndexes;
+ this.useServerTimestamp = useServerTimestamp;
+ this.maxSize = maxSize;
+ this.maxSizeBytes = maxSizeBytes;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return queryPlan.getContext().getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return queryPlan.getContext();
+ }
+
+ @Override
+ public TableRef getTargetRef() {
+ return tableRef;
+ }
+
+ @Override
+ public QueryPlan getQueryPlan() {
+ return queryPlan;
+ }
+
+ @Override
+ public Set<TableRef> getSourceRefs() {
+ return originalQueryPlan.getSourceRefs();
+ }
+
+ @Override
+ public Operation getOperation() {
+ return operation;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ ResultIterator iterator = queryPlan.iterator();
+ if (parallelIteratorFactory == null) {
+ return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
+ }
+ try {
+ parallelIteratorFactory.setRowProjector(projector);
+ parallelIteratorFactory.setColumnIndexes(columnIndexes);
+ parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+ Tuple tuple;
+ long totalRowCount = 0;
+ StatementContext context = queryPlan.getContext();
+ while ((tuple=iterator.next()) != null) {// Runs query
+ Cell kv = tuple.getValue(0);
+ totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
+ }
+ // Return total number of rows that have been updated. In the case of auto commit being off
+ // the mutations will all be in the mutation state of the current connection.
+ MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount);
+ /*
+ * All the metrics collected for measuring the reads done by the parallel mutating iterators
+ * is included in the ReadMetricHolder of the statement context. Include these metrics in the
+ * returned mutation state so they can be published on commit.
+ */
+ mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+ return mutationState;
+ } finally {
+ iterator.close();
+ }
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("UPSERT SELECT");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+
+ @Override
+ public Long getEstimatedRowsToScan() throws SQLException {
+ return queryPlan.getEstimatedRowsToScan();
+ }
+
+ @Override
+ public Long getEstimatedBytesToScan() throws SQLException {
+ return queryPlan.getEstimatedBytesToScan();
+ }
+
+ @Override
+ public Long getEstimateInfoTimestamp() throws SQLException {
+ return queryPlan.getEstimateInfoTimestamp();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d35cce1..174e643 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1313,11 +1313,12 @@ public class PhoenixStatement implements Statement, SQLCloseable {
public ExplainPlan getExplainPlan() throws SQLException {
return new ExplainPlan(Collections.singletonList("EXECUTE UPGRADE"));
}
-
+
@Override
- public StatementContext getContext() {
- return new StatementContext(stmt);
- }
+ public QueryPlan getQueryPlan() { return null; }
+
+ @Override
+ public StatementContext getContext() { return new StatementContext(stmt); }
@Override
public TableRef getTargetRef() {