You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/01/27 01:19:07 UTC
[23/26] phoenix git commit: PHOENIX-3351 Implement TODOs in
PhoenixTableModify#upsert to allow writes to tenant specific
tables(Rajeshbabu)
PHOENIX-3351 Implement TODOs in PhoenixTableModify#upsert to allow writes to tenant specific tables(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b9323e1d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b9323e1d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b9323e1d
Branch: refs/heads/calcite
Commit: b9323e1d30ba6b449f059b86ae7b8157de16b13d
Parents: 9b7f3ca
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Mon Jan 23 15:05:37 2017 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Mon Jan 23 15:05:37 2017 +0530
----------------------------------------------------------------------
.../apache/phoenix/compile/UpsertCompiler.java | 44 ++++++++++++++------
1 file changed, 32 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9323e1d/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 8837445..32ce6ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -98,6 +98,7 @@ import org.apache.phoenix.schema.TypeMismatchException;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PSmallint;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PUnsignedLong;
import org.apache.phoenix.schema.types.PVarbinary;
@@ -116,7 +117,7 @@ public class UpsertCompiler {
private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
- byte[][] viewConstants, byte[] onDupKeyBytes) throws SQLException {
+ byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException {
Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
byte[][] pkValues = new byte[table.getPKColumns().size()][];
// If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -124,10 +125,13 @@ public class UpsertCompiler {
if (table.getBucketNum() != null) {
pkValues[0] = new byte[] {0};
}
+ for(int i = 0; i < numSplColumns; i++) {
+ pkValues[i] = values[i];
+ }
Long rowTimestamp = null; // case when the table doesn't have a row timestamp column
RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
- for (int i = 0; i < values.length; i++) {
- byte[] value = values[i];
+ for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
+ byte[] value = values[j];
PColumn column = table.getColumns().get(columnIndexes[i]);
if (SchemaUtil.isPKColumn(column)) {
pkValues[pkSlotIndex[i]] = value;
@@ -163,8 +167,8 @@ public class UpsertCompiler {
mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
}
- private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
- ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp) throws SQLException {
+ public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
+ ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, boolean prefixSysColValues) throws SQLException {
PhoenixStatement statement = childContext.getStatement();
PhoenixConnection connection = statement.getConnection();
ConnectionQueryServices services = connection.getQueryServices();
@@ -172,7 +176,23 @@ public class UpsertCompiler {
QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
boolean isAutoCommit = connection.getAutoCommit();
- byte[][] values = new byte[columnIndexes.length][];
+ int numSplColumns =
+ (tableRef.getTable().isMultiTenant() ? 1 : 0)
+ + (tableRef.getTable().getViewIndexId() != null ? 1 : 0);
+ byte[][] values = new byte[columnIndexes.length + numSplColumns][];
+ if(prefixSysColValues) {
+ int i = 0;
+ if(tableRef.getTable().isMultiTenant()) {
+ values[i++] = connection.getTenantId().getBytes();
+ }
+ if(tableRef.getTable().getViewIndexId() != null) {
+ values[i++] = PSmallint.INSTANCE.toBytes(tableRef.getTable().getViewIndexId());
+ }
+
+ for(int k = 0; k < pkSlotIndexes.length; k++) {
+ pkSlotIndexes[k] += (i + (tableRef.getTable().getBucketNum() != null ? 1 : 0));
+ }
+ }
int rowCount = 0;
Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
PTable table = tableRef.getTable();
@@ -192,7 +212,7 @@ public class UpsertCompiler {
try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
while (rs.next()) {
- for (int i = 0; i < values.length; i++) {
+ for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
PColumn column = table.getColumns().get(columnIndexes[i]);
byte[] bytes = rs.getBytes(i + 1);
ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
@@ -212,9 +232,9 @@ public class UpsertCompiler {
precision, scale, SortOrder.getDefault(),
column.getMaxLength(), column.getScale(), column.getSortOrder(),
table.rowKeyOrderOptimizable());
- values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ values[j] = ByteUtil.copyKeyBytesIfNecessary(ptr);
}
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null);
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null, numSplColumns);
rowCount++;
// Commit a batch if auto commit is true and we're at our batch size
if (isAutoCommit && rowCount % batchSize == 0) {
@@ -256,7 +276,7 @@ public class UpsertCompiler {
StatementContext childContext = new StatementContext(statement, false);
// Clone the row projector as it's not thread safe and would be used simultaneously by
// multiple threads otherwise.
- MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, useSeverTimestamp);
+ MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, useSeverTimestamp, false);
return state;
}
@@ -798,7 +818,7 @@ public class UpsertCompiler {
public MutationState execute() throws SQLException {
ResultIterator iterator = queryPlan.iterator();
if (parallelIteratorFactory == null) {
- return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp);
+ return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
}
try {
parallelIteratorFactory.setRowProjector(projector);
@@ -1043,7 +1063,7 @@ public class UpsertCompiler {
indexMaintainer = table.getIndexMaintainer(parentTable, connection);
viewConstants = IndexUtil.getViewConstants(parentTable);
}
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes);
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0);
return new MutationState(tableRef, mutation, 0, maxSize, connection);
}