You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/05/05 14:41:55 UTC
phoenix git commit: Add transaction related code for query and DML
(PHOENIX-2197 Support DML in Phoenix/Calcite integration)
Repository: phoenix
Updated Branches:
refs/heads/calcite 739b68cf2 -> 7c662fd35
Add transaction related code for query and DML (PHOENIX-2197 Support DML in Phoenix/Calcite integration)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7c662fd3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7c662fd3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7c662fd3
Branch: refs/heads/calcite
Commit: 7c662fd35d8095c1f1096a2fd74eb0bcb58659ab
Parents: 739b68c
Author: maryannxue <ma...@gmail.com>
Authored: Thu May 5 10:41:47 2016 -0400
Committer: maryannxue <ma...@gmail.com>
Committed: Thu May 5 10:41:47 2016 -0400
----------------------------------------------------------------------
.../apache/phoenix/calcite/BaseCalciteIT.java | 1 +
.../apache/phoenix/calcite/CalciteRuntime.java | 46 ++++-
.../phoenix/calcite/rel/PhoenixTableModify.java | 200 ++++++++++---------
3 files changed, 147 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c662fd3/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
index cb7d01d..dc21809 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
@@ -122,6 +122,7 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT {
public void close() {
if (connection != null) {
try {
+ connection.commit();
connection.close();
} catch (SQLException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c662fd3/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
index a3feffa..df036a9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
@@ -8,9 +8,12 @@ import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.StatementPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDate;
@@ -34,6 +37,7 @@ import org.apache.phoenix.schema.types.PhoenixArray;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.util.Iterator;
/**
* Methods used by code generated by Calcite.
@@ -60,13 +64,26 @@ public class CalciteRuntime {
};
}
- public static Enumerator<Object> toEnumerator(QueryPlan plan) throws SQLException {
- final ResultIterator iterator = plan.iterator();
+ public static Enumerator<Object> toEnumerator(final QueryPlan plan) throws SQLException {
final RowProjector rowProjector = plan.getProjector();
final int count = rowProjector.getColumnCount();
return new Enumerator<Object>() {
+ ResultIterator iterator = null;
Object current;
- private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+
+ void init() throws SQLException {
+ final StatementContext context = plan.getContext();
+ final PhoenixConnection connection = context.getConnection();
+ Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
+ connection.getMutationState().sendUncommitted(tableRefs);
+ iterator = plan.iterator();
+ context.getOverallQueryMetrics().startQuery();
+ if (connection.getAutoCommit()) {
+ connection.commit();
+ }
+ connection.incrementStatementExecutionCounter();
+ }
@Override
public Object current() {
@@ -76,6 +93,9 @@ public class CalciteRuntime {
@Override
public boolean moveNext() {
try {
+ if (iterator == null) {
+ init();
+ }
final Tuple tuple = iterator.next();
if (tuple == null) {
current = null;
@@ -179,9 +199,23 @@ public class CalciteRuntime {
}
try {
- MutationState state = plan.execute();
- updateCount = state.getUpdateCount();
- state.commit();
+ final PhoenixConnection connection = plan.getContext().getConnection();
+ final MutationState state = connection.getMutationState();
+ if (plan.getTargetRef() != null
+ && plan.getTargetRef().getTable() != null
+ && plan.getTargetRef().getTable().isTransactional()) {
+ state.startTransaction();
+ }
+ final Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
+ state.sendUncommitted(tableRefs);
+ state.checkpointIfNeccessary(plan);
+ final MutationState lastState = plan.execute();
+ state.join(lastState);
+ if (connection.getAutoCommit()) {
+ connection.commit();
+ }
+ updateCount = lastState.getUpdateCount();
+ connection.incrementStatementExecutionCounter();
} catch (SQLException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c662fd3/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
index 6292070..6629cb4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@ -13,12 +13,16 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.prepare.Prepare.CatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableModify;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.StatementPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -30,6 +34,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -73,113 +78,120 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel {
final QueryPlan queryPlan = implementor.visitInput(0, (PhoenixQueryRel) input);
final RowProjector projector = implementor.getTableMapping().createRowProjector();
+
final PhoenixTable targetTable = getTable().unwrap(PhoenixTable.class);
final PhoenixConnection connection = targetTable.pc;
final TableRef targetTableRef = targetTable.tableMapping.getTableRef();
- final List<PColumn> mappedColumns = targetTable.tableMapping.getMappedColumns();
- final int[] columnIndexes = new int[mappedColumns.size()];
- final int[] pkSlotIndexes = new int[mappedColumns.size()];
- for (int i = 0; i < columnIndexes.length; i++) {
- PColumn column = mappedColumns.get(i);
- if (SchemaUtil.isPKColumn(column)) {
- pkSlotIndexes[i] = column.getPosition();
- }
- columnIndexes[i] = column.getPosition();
- }
- // TODO
- final boolean useServerTimestamp = false;
-
- return new MutationPlan() {
- @Override
- public ParameterMetaData getParameterMetaData() {
- return queryPlan.getContext().getBindManager().getParameterMetaData();
- }
+ try (PhoenixStatement stmt = new PhoenixStatement(connection)) {
+ final ColumnResolver resolver = FromCompiler.getResolver(targetTableRef);
+ final StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
- @Override
- public StatementContext getContext() {
- return queryPlan.getContext();
+ final List<PColumn> mappedColumns = targetTable.tableMapping.getMappedColumns();
+ final int[] columnIndexes = new int[mappedColumns.size()];
+ final int[] pkSlotIndexes = new int[mappedColumns.size()];
+ for (int i = 0; i < columnIndexes.length; i++) {
+ PColumn column = mappedColumns.get(i);
+ if (SchemaUtil.isPKColumn(column)) {
+ pkSlotIndexes[i] = column.getPosition();
+ }
+ columnIndexes[i] = column.getPosition();
}
+ // TODO
+ final boolean useServerTimestamp = false;
+
+ return new MutationPlan() {
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return queryPlan.getContext().getBindManager().getParameterMetaData();
+ }
- @Override
- public TableRef getTargetRef() {
- return targetTableRef;
- }
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
- @Override
- public Set<TableRef> getSourceRefs() {
- // TODO return originalQueryPlan.getSourceRefs();
- return queryPlan.getSourceRefs();
- }
+ @Override
+ public TableRef getTargetRef() {
+ return targetTableRef;
+ }
- @Override
- public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() {
- return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT;
- }
+ @Override
+ public Set<TableRef> getSourceRefs() {
+ // TODO return originalQueryPlan.getSourceRefs();
+ return queryPlan.getSourceRefs();
+ }
- @Override
- public MutationState execute() throws SQLException {
- ResultIterator iterator = queryPlan.iterator();
- // simplest version, no run-on-server, no pipelined update
- StatementContext childContext = queryPlan.getContext();
- ConnectionQueryServices services = connection.getQueryServices();
- int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
- QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
- int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
- boolean isAutoCommit = connection.getAutoCommit();
- byte[][] values = new byte[columnIndexes.length][];
- int rowCount = 0;
- Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
- PTable table = targetTableRef.getTable();
- try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- while (rs.next()) {
- for (int i = 0; i < values.length; i++) {
- PColumn column = table.getColumns().get(columnIndexes[i]);
- byte[] bytes = rs.getBytes(i + 1);
- ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
- Object value = rs.getObject(i + 1);
- int rsPrecision = rs.getMetaData().getPrecision(i + 1);
- Integer precision = rsPrecision == 0 ? null : rsPrecision;
- int rsScale = rs.getMetaData().getScale(i + 1);
- Integer scale = rsScale == 0 ? null : rsScale;
- // We are guaranteed that the two column will have compatible types,
- // as we checked that before.
- if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale,
- column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
- .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
- .buildException(); }
- column.getDataType().coerceBytes(ptr, value, column.getDataType(),
- precision, scale, SortOrder.getDefault(),
- column.getMaxLength(), column.getScale(), column.getSortOrder(),
- table.rowKeyOrderOptimizable());
- values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
- }
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp);
- rowCount++;
- // Commit a batch if auto commit is true and we're at our batch size
- if (isAutoCommit && rowCount % batchSize == 0) {
- MutationState state = new MutationState(targetTableRef, mutation, 0, maxSize, connection);
- connection.getMutationState().join(state);
- connection.getMutationState().send();
- mutation.clear();
+ @Override
+ public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() {
+ return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ ResultIterator iterator = queryPlan.iterator();
+ // simplest version, no run-on-server, no pipelined update
+ StatementContext childContext = queryPlan.getContext();
+ ConnectionQueryServices services = connection.getQueryServices();
+ int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+ boolean isAutoCommit = connection.getAutoCommit();
+ byte[][] values = new byte[columnIndexes.length][];
+ int rowCount = 0;
+ Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+ PTable table = targetTableRef.getTable();
+ try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ while (rs.next()) {
+ for (int i = 0; i < values.length; i++) {
+ PColumn column = table.getColumns().get(columnIndexes[i]);
+ byte[] bytes = rs.getBytes(i + 1);
+ ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
+ Object value = rs.getObject(i + 1);
+ int rsPrecision = rs.getMetaData().getPrecision(i + 1);
+ Integer precision = rsPrecision == 0 ? null : rsPrecision;
+ int rsScale = rs.getMetaData().getScale(i + 1);
+ Integer scale = rsScale == 0 ? null : rsScale;
+ // We are guaranteed that the two column will have compatible types,
+ // as we checked that before.
+ if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale,
+ column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+ .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
+ .buildException(); }
+ column.getDataType().coerceBytes(ptr, value, column.getDataType(),
+ precision, scale, SortOrder.getDefault(),
+ column.getMaxLength(), column.getScale(), column.getSortOrder(),
+ table.rowKeyOrderOptimizable());
+ values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ }
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp);
+ rowCount++;
+ // Commit a batch if auto commit is true and we're at our batch size
+ if (isAutoCommit && rowCount % batchSize == 0) {
+ MutationState state = new MutationState(targetTableRef, mutation, 0, maxSize, connection);
+ connection.getMutationState().join(state);
+ connection.getMutationState().send();
+ mutation.clear();
+ }
}
+ // If auto commit is true, this last batch will be committed upon return
+ return new MutationState(targetTableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
}
- // If auto commit is true, this last batch will be committed upon return
- return new MutationState(targetTableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
}
- }
- @Override
- public ExplainPlan getExplainPlan() throws SQLException {
- List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps();
- List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
- planSteps.add("UPSERT SELECT");
- planSteps.addAll(queryPlanSteps);
- return new ExplainPlan(planSteps);
- }
-
- };
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("UPSERT SELECT");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+ };
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixConnection connection, boolean useServerTimestamp) {