You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/05/05 14:41:55 UTC

phoenix git commit: Add transaction related code for query and DML (PHOENIX-2197 Support DML in Phoenix/Calcite integration)

Repository: phoenix
Updated Branches:
  refs/heads/calcite 739b68cf2 -> 7c662fd35


Add transaction related code for query and DML (PHOENIX-2197 Support DML in Phoenix/Calcite integration)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7c662fd3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7c662fd3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7c662fd3

Branch: refs/heads/calcite
Commit: 7c662fd35d8095c1f1096a2fd74eb0bcb58659ab
Parents: 739b68c
Author: maryannxue <ma...@gmail.com>
Authored: Thu May 5 10:41:47 2016 -0400
Committer: maryannxue <ma...@gmail.com>
Committed: Thu May 5 10:41:47 2016 -0400

----------------------------------------------------------------------
 .../apache/phoenix/calcite/BaseCalciteIT.java   |   1 +
 .../apache/phoenix/calcite/CalciteRuntime.java  |  46 ++++-
 .../phoenix/calcite/rel/PhoenixTableModify.java | 200 ++++++++++---------
 3 files changed, 147 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c662fd3/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
index cb7d01d..dc21809 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
@@ -122,6 +122,7 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT {
         public void close() {
             if (connection != null) {
                 try {
+                    connection.commit();
                     connection.close();
                 } catch (SQLException e) {
                     throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c662fd3/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
index a3feffa..df036a9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
@@ -8,9 +8,12 @@ import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDate;
@@ -34,6 +37,7 @@ import org.apache.phoenix.schema.types.PhoenixArray;
 
 import java.sql.SQLException;
 import java.sql.Timestamp;
+import java.util.Iterator;
 
 /**
  * Methods used by code generated by Calcite.
@@ -60,13 +64,26 @@ public class CalciteRuntime {
         };
     }
 
-    public static Enumerator<Object> toEnumerator(QueryPlan plan) throws SQLException {
-        final ResultIterator iterator = plan.iterator();
+    public static Enumerator<Object> toEnumerator(final QueryPlan plan) throws SQLException {
         final RowProjector rowProjector = plan.getProjector();
         final int count = rowProjector.getColumnCount();
         return new Enumerator<Object>() {
+            ResultIterator iterator = null;
             Object current;
-            private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            
+            void init() throws SQLException {
+                final StatementContext context = plan.getContext();
+                final PhoenixConnection connection = context.getConnection();
+                Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
+                connection.getMutationState().sendUncommitted(tableRefs);
+                iterator = plan.iterator();
+                context.getOverallQueryMetrics().startQuery();
+                if (connection.getAutoCommit()) {
+                    connection.commit();
+                }
+                connection.incrementStatementExecutionCounter();                
+            }
 
             @Override
             public Object current() {
@@ -76,6 +93,9 @@ public class CalciteRuntime {
             @Override
             public boolean moveNext() {
                 try {
+                    if (iterator == null) {
+                        init();
+                    }
                     final Tuple tuple = iterator.next();
                     if (tuple == null) {
                         current = null;
@@ -179,9 +199,23 @@ public class CalciteRuntime {
                 }
 
                 try {
-                    MutationState state = plan.execute();
-                    updateCount = state.getUpdateCount();
-                    state.commit();
+                    final PhoenixConnection connection = plan.getContext().getConnection();
+                    final MutationState state = connection.getMutationState();
+                    if (plan.getTargetRef() != null
+                            && plan.getTargetRef().getTable() != null
+                            && plan.getTargetRef().getTable().isTransactional()) {
+                        state.startTransaction();
+                    }
+                    final Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
+                    state.sendUncommitted(tableRefs);
+                    state.checkpointIfNeccessary(plan);
+                    final MutationState lastState = plan.execute();
+                    state.join(lastState);
+                    if (connection.getAutoCommit()) {
+                        connection.commit();
+                    }
+                    updateCount = lastState.getUpdateCount();
+                    connection.incrementStatementExecutionCounter();
                 } catch (SQLException e) {
                     throw new RuntimeException(e);
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c662fd3/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
index 6292070..6629cb4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@ -13,12 +13,16 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.prepare.Prepare.CatalogReader;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableModify;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementPlan;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -30,6 +34,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -73,113 +78,120 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel {
         
         final QueryPlan queryPlan = implementor.visitInput(0, (PhoenixQueryRel) input);
         final RowProjector projector = implementor.getTableMapping().createRowProjector();
+
         final PhoenixTable targetTable = getTable().unwrap(PhoenixTable.class);
         final PhoenixConnection connection = targetTable.pc;
         final TableRef targetTableRef = targetTable.tableMapping.getTableRef();
-        final List<PColumn> mappedColumns = targetTable.tableMapping.getMappedColumns();
-        final int[] columnIndexes = new int[mappedColumns.size()];
-        final int[] pkSlotIndexes = new int[mappedColumns.size()];
-        for (int i = 0; i < columnIndexes.length; i++) {
-            PColumn column = mappedColumns.get(i);
-            if (SchemaUtil.isPKColumn(column)) {
-                pkSlotIndexes[i] = column.getPosition();
-            }
-            columnIndexes[i] = column.getPosition();
-        }
-        // TODO
-        final boolean useServerTimestamp = false;
-        
-        return new MutationPlan() {
-            @Override
-            public ParameterMetaData getParameterMetaData() {
-                return queryPlan.getContext().getBindManager().getParameterMetaData();
-            }
+        try (PhoenixStatement stmt = new PhoenixStatement(connection)) {
+            final ColumnResolver resolver = FromCompiler.getResolver(targetTableRef);
+            final StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
 
-            @Override
-            public StatementContext getContext() {
-                return queryPlan.getContext();
+            final List<PColumn> mappedColumns = targetTable.tableMapping.getMappedColumns();
+            final int[] columnIndexes = new int[mappedColumns.size()];
+            final int[] pkSlotIndexes = new int[mappedColumns.size()];
+            for (int i = 0; i < columnIndexes.length; i++) {
+                PColumn column = mappedColumns.get(i);
+                if (SchemaUtil.isPKColumn(column)) {
+                    pkSlotIndexes[i] = column.getPosition();
+                }
+                columnIndexes[i] = column.getPosition();
             }
+            // TODO
+            final boolean useServerTimestamp = false;
+            
+            return new MutationPlan() {
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return queryPlan.getContext().getBindManager().getParameterMetaData();
+                }
 
-            @Override
-            public TableRef getTargetRef() {
-                return targetTableRef;
-            }
+                @Override
+                public StatementContext getContext() {
+                    return context;
+                }
 
-            @Override
-            public Set<TableRef> getSourceRefs() {
-                // TODO return originalQueryPlan.getSourceRefs();
-                return queryPlan.getSourceRefs();
-            }
+                @Override
+                public TableRef getTargetRef() {
+                    return targetTableRef;
+                }
 
-            @Override
-            public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() {
-                return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT;
-            }
+                @Override
+                public Set<TableRef> getSourceRefs() {
+                    // TODO return originalQueryPlan.getSourceRefs();
+                    return queryPlan.getSourceRefs();
+                }
 
-            @Override
-            public MutationState execute() throws SQLException {
-                ResultIterator iterator = queryPlan.iterator();
-                // simplest version, no run-on-server, no pipelined update
-                StatementContext childContext = queryPlan.getContext();
-                ConnectionQueryServices services = connection.getQueryServices();
-                int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
-                        QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
-                int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
-                boolean isAutoCommit = connection.getAutoCommit();
-                byte[][] values = new byte[columnIndexes.length][];
-                int rowCount = 0;
-                Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
-                PTable table = targetTableRef.getTable();
-                try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
-                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                    while (rs.next()) {
-                        for (int i = 0; i < values.length; i++) {
-                            PColumn column = table.getColumns().get(columnIndexes[i]);
-                            byte[] bytes = rs.getBytes(i + 1);
-                            ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
-                            Object value = rs.getObject(i + 1);
-                            int rsPrecision = rs.getMetaData().getPrecision(i + 1);
-                            Integer precision = rsPrecision == 0 ? null : rsPrecision;
-                            int rsScale = rs.getMetaData().getScale(i + 1);
-                            Integer scale = rsScale == 0 ? null : rsScale;
-                            // We are guaranteed that the two column will have compatible types,
-                            // as we checked that before.
-                            if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale,
-                                    column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
-                                    SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
-                                    .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
-                                    .buildException(); }
-                            column.getDataType().coerceBytes(ptr, value, column.getDataType(), 
-                                    precision, scale, SortOrder.getDefault(), 
-                                    column.getMaxLength(), column.getScale(), column.getSortOrder(),
-                                    table.rowKeyOrderOptimizable());
-                            values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                        }
-                        setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp);
-                        rowCount++;
-                        // Commit a batch if auto commit is true and we're at our batch size
-                        if (isAutoCommit && rowCount % batchSize == 0) {
-                            MutationState state = new MutationState(targetTableRef, mutation, 0, maxSize, connection);
-                            connection.getMutationState().join(state);
-                            connection.getMutationState().send();
-                            mutation.clear();
+                @Override
+                public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() {
+                    return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT;
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    ResultIterator iterator = queryPlan.iterator();
+                    // simplest version, no run-on-server, no pipelined update
+                    StatementContext childContext = queryPlan.getContext();
+                    ConnectionQueryServices services = connection.getQueryServices();
+                    int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+                            QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+                    int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+                    boolean isAutoCommit = connection.getAutoCommit();
+                    byte[][] values = new byte[columnIndexes.length][];
+                    int rowCount = 0;
+                    Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+                    PTable table = targetTableRef.getTable();
+                    try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
+                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                        while (rs.next()) {
+                            for (int i = 0; i < values.length; i++) {
+                                PColumn column = table.getColumns().get(columnIndexes[i]);
+                                byte[] bytes = rs.getBytes(i + 1);
+                                ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
+                                Object value = rs.getObject(i + 1);
+                                int rsPrecision = rs.getMetaData().getPrecision(i + 1);
+                                Integer precision = rsPrecision == 0 ? null : rsPrecision;
+                                int rsScale = rs.getMetaData().getScale(i + 1);
+                                Integer scale = rsScale == 0 ? null : rsScale;
+                                // We are guaranteed that the two column will have compatible types,
+                                // as we checked that before.
+                                if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale,
+                                        column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
+                                        SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+                                        .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
+                                        .buildException(); }
+                                column.getDataType().coerceBytes(ptr, value, column.getDataType(), 
+                                        precision, scale, SortOrder.getDefault(), 
+                                        column.getMaxLength(), column.getScale(), column.getSortOrder(),
+                                        table.rowKeyOrderOptimizable());
+                                values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                            }
+                            setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp);
+                            rowCount++;
+                            // Commit a batch if auto commit is true and we're at our batch size
+                            if (isAutoCommit && rowCount % batchSize == 0) {
+                                MutationState state = new MutationState(targetTableRef, mutation, 0, maxSize, connection);
+                                connection.getMutationState().join(state);
+                                connection.getMutationState().send();
+                                mutation.clear();
+                            }
                         }
+                        // If auto commit is true, this last batch will be committed upon return
+                        return new MutationState(targetTableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
                     }
-                    // If auto commit is true, this last batch will be committed upon return
-                    return new MutationState(targetTableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
                 }
-            }
 
-            @Override
-            public ExplainPlan getExplainPlan() throws SQLException {
-                List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
-                List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                planSteps.add("UPSERT SELECT");
-                planSteps.addAll(queryPlanSteps);
-                return new ExplainPlan(planSteps);
-            }
-            
-        };
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
+                    List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                    planSteps.add("UPSERT SELECT");
+                    planSteps.addAll(queryPlanSteps);
+                    return new ExplainPlan(planSteps);
+                }                
+            };
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
     
     private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixConnection connection, boolean useServerTimestamp) {