You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/01/27 01:19:07 UTC

[23/26] phoenix git commit: PHOENIX-3351 Implement TODOs in PhoenixTableModify#upsert to allow writes to tenant specific tables(Rajeshbabu)

PHOENIX-3351 Implement TODOs in PhoenixTableModify#upsert to allow writes to tenant specific tables(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b9323e1d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b9323e1d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b9323e1d

Branch: refs/heads/calcite
Commit: b9323e1d30ba6b449f059b86ae7b8157de16b13d
Parents: 9b7f3ca
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Mon Jan 23 15:05:37 2017 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Mon Jan 23 15:05:37 2017 +0530

----------------------------------------------------------------------
 .../apache/phoenix/compile/UpsertCompiler.java  | 44 ++++++++++++++------
 1 file changed, 32 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9323e1d/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 8837445..32ce6ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -98,6 +98,7 @@ import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PSmallint;
 import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.schema.types.PUnsignedLong;
 import org.apache.phoenix.schema.types.PVarbinary;
@@ -116,7 +117,7 @@ public class UpsertCompiler {
     private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
             PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
             PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
-            byte[][] viewConstants, byte[] onDupKeyBytes) throws SQLException {
+            byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException {
         Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -124,10 +125,13 @@ public class UpsertCompiler {
         if (table.getBucketNum() != null) {
             pkValues[0] = new byte[] {0};
         }
+        for(int i = 0; i < numSplColumns; i++) {
+            pkValues[i] = values[i];
+        }
         Long rowTimestamp = null; // case when the table doesn't have a row timestamp column
         RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
-        for (int i = 0; i < values.length; i++) {
-            byte[] value = values[i];
+        for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
+            byte[] value = values[j];
             PColumn column = table.getColumns().get(columnIndexes[i]);
             if (SchemaUtil.isPKColumn(column)) {
                 pkValues[pkSlotIndex[i]] = value;
@@ -163,8 +167,8 @@ public class UpsertCompiler {
         mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
     }
     
-    private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
-            ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp) throws SQLException {
+    public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
+            ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, boolean prefixSysColValues) throws SQLException {
         PhoenixStatement statement = childContext.getStatement();
         PhoenixConnection connection = statement.getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
@@ -172,7 +176,23 @@ public class UpsertCompiler {
                 QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
         int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
         boolean isAutoCommit = connection.getAutoCommit();
-        byte[][] values = new byte[columnIndexes.length][];
+        int numSplColumns =
+                (tableRef.getTable().isMultiTenant() ? 1 : 0)
+                        + (tableRef.getTable().getViewIndexId() != null ? 1 : 0);
+        byte[][] values = new byte[columnIndexes.length + numSplColumns][];
+        if(prefixSysColValues) {
+            int i = 0;
+            if(tableRef.getTable().isMultiTenant()) {
+                values[i++] = connection.getTenantId().getBytes();
+            }
+            if(tableRef.getTable().getViewIndexId() != null) {
+                values[i++] = PSmallint.INSTANCE.toBytes(tableRef.getTable().getViewIndexId());
+            }
+            
+            for(int k = 0; k <  pkSlotIndexes.length; k++) {
+                pkSlotIndexes[k] += (i + (tableRef.getTable().getBucketNum() != null ? 1 : 0));
+            }
+        }
         int rowCount = 0;
         Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
         PTable table = tableRef.getTable();
@@ -192,7 +212,7 @@ public class UpsertCompiler {
         try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             while (rs.next()) {
-                for (int i = 0; i < values.length; i++) {
+                for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
                     PColumn column = table.getColumns().get(columnIndexes[i]);
                     byte[] bytes = rs.getBytes(i + 1);
                     ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
@@ -212,9 +232,9 @@ public class UpsertCompiler {
                             precision, scale, SortOrder.getDefault(), 
                             column.getMaxLength(), column.getScale(), column.getSortOrder(),
                             table.rowKeyOrderOptimizable());
-                    values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                    values[j] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null);
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null, numSplColumns);
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
                 if (isAutoCommit && rowCount % batchSize == 0) {
@@ -256,7 +276,7 @@ public class UpsertCompiler {
             StatementContext childContext = new StatementContext(statement, false);
             // Clone the row projector as it's not thread safe and would be used simultaneously by
             // multiple threads otherwise.
-            MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, useSeverTimestamp);
+            MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, useSeverTimestamp, false);
             return state;
         }
         
@@ -798,7 +818,7 @@ public class UpsertCompiler {
                 public MutationState execute() throws SQLException {
                     ResultIterator iterator = queryPlan.iterator();
                     if (parallelIteratorFactory == null) {
-                        return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp);
+                        return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
                     }
                     try {
                         parallelIteratorFactory.setRowProjector(projector);
@@ -1043,7 +1063,7 @@ public class UpsertCompiler {
                     indexMaintainer = table.getIndexMaintainer(parentTable, connection);
                     viewConstants = IndexUtil.getViewConstants(parentTable);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes);
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0);
                 return new MutationState(tableRef, mutation, 0, maxSize, connection);
             }