You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/11/04 22:13:37 UTC
[10/50] [abbrv] phoenix git commit: PHOENIX-6 Support ON DUPLICATE
KEY construct
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index affa778..e4a64e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -17,20 +17,76 @@
*/
package org.apache.phoenix.index;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import com.google.common.collect.Lists;
/**
* Index builder for covered-columns index that ties into phoenix for faster use.
*/
public class PhoenixIndexBuilder extends NonTxIndexBuilder {
+ public static final String ATOMIC_OP_ATTRIB = "_ATOMIC_OP_ATTRIB";
+ private static final byte[] ON_DUP_KEY_IGNORE_BYTES = new byte[] {1}; // boolean true
+ private static final int ON_DUP_KEY_HEADER_BYTE_SIZE = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BOOLEAN;
+
+ private static List<Cell> flattenCells(Mutation m, int estimatedSize) throws IOException {
+ List<Cell> flattenedCells = Lists.newArrayListWithExpectedSize(estimatedSize);
+ flattenCells(m, flattenedCells);
+ return flattenedCells;
+ }
+
+ private static void flattenCells(Mutation m, List<Cell> flattenedCells) throws IOException {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ flattenedCells.addAll(cells);
+ }
+ }
+
@Override
public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
return new PhoenixIndexMetaData(env, miniBatchOp.getOperation(0).getAttributesMap());
@@ -53,4 +109,266 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
@Override
public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException {
}
+
+ @Override
+ public boolean isAtomicOp(Mutation m) throws IOException {
+ return m.getAttribute(ATOMIC_OP_ATTRIB) != null;
+ }
+
+ private static void transferCells(Mutation source, Mutation target) {
+ target.getFamilyCellMap().putAll(source.getFamilyCellMap());
+ }
+ private static void transferAttributes(Mutation source, Mutation target) {
+ for (Map.Entry<String, byte[]> entry : source.getAttributesMap().entrySet()) {
+ target.setAttribute(entry.getKey(), entry.getValue());
+ }
+ }
+ private static List<Mutation> convertIncrementToPutInSingletonList(Increment inc) {
+ byte[] rowKey = inc.getRow();
+ Put put = new Put(rowKey);
+ transferCells(inc, put);
+ transferAttributes(inc, put);
+ return Collections.<Mutation>singletonList(put);
+ }
+
+ @Override
+ public List<Mutation> executeAtomicOp(Increment inc) throws IOException {
+ byte[] opBytes = inc.getAttribute(ATOMIC_OP_ATTRIB);
+ if (opBytes == null) { // Unexpected
+ return null;
+ }
+ inc.setAttribute(ATOMIC_OP_ATTRIB, null);
+ Put put = null;
+ Delete delete = null;
+ // We cannot neither use the time stamp in the Increment to set the Get time range
+ // nor set the Put/Delete time stamp and have this be atomic as HBase does not
+ // handle that. Though we disallow using ON DUPLICATE KEY clause when the
+ // CURRENT_SCN is set, we still may have a time stamp set as of when the table
+ // was resolved on the client side. We need to ignore this as well due to limitations
+ // in HBase, but this isn't too bad as the time will be very close the the current
+ // time anyway.
+ long ts = HConstants.LATEST_TIMESTAMP;
+ byte[] rowKey = inc.getRow();
+ final Get get = new Get(rowKey);
+ if (isDupKeyIgnore(opBytes)) {
+ get.setFilter(new FirstKeyOnlyFilter());
+ Result result = this.env.getRegion().get(get);
+ return result.isEmpty() ? convertIncrementToPutInSingletonList(inc) : Collections.<Mutation>emptyList();
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(opBytes);
+ DataInputStream input = new DataInputStream(stream);
+ boolean skipFirstOp = input.readBoolean();
+ short repeat = input.readShort();
+ final int[] estimatedSizeHolder = {0};
+ List<Pair<PTable, List<Expression>>> operations = Lists.newArrayListWithExpectedSize(3);
+ while (true) {
+ ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() {
+ @Override
+ public Void visit(KeyValueColumnExpression expression) {
+ get.addColumn(expression.getColumnFamily(), expression.getColumnName());
+ estimatedSizeHolder[0]++;
+ return null;
+ }
+ };
+ try {
+ int nExpressions = WritableUtils.readVInt(input);
+ List<Expression>expressions = Lists.newArrayListWithExpectedSize(nExpressions);
+ for (int i = 0; i < nExpressions; i++) {
+ Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+ expression.readFields(input);
+ expressions.add(expression);
+ expression.accept(visitor);
+ }
+ PTableProtos.PTable tableProto = PTableProtos.PTable.parseDelimitedFrom(input);
+ PTable table = PTableImpl.createFromProto(tableProto);
+ operations.add(new Pair<>(table, expressions));
+ } catch (EOFException e) {
+ break;
+ }
+ }
+ int estimatedSize = estimatedSizeHolder[0];
+ if (get.getFamilyMap().isEmpty()) {
+ get.setFilter(new FirstKeyOnlyFilter());
+ }
+ MultiKeyValueTuple tuple;
+ List<Cell>cells = this.env.getRegion().get(get, false);
+ if (cells.isEmpty()) {
+ if (skipFirstOp) {
+ if (operations.size() <= 1 && repeat <= 1) {
+ return convertIncrementToPutInSingletonList(inc);
+ }
+ repeat--; // Skip first operation (if first wasn't ON DUPLICATE KEY IGNORE)
+ }
+ // Base current state off of new row
+ tuple = new MultiKeyValueTuple(flattenCells(inc, estimatedSize));
+ } else {
+ // Base current state off of existing row
+ tuple = new MultiKeyValueTuple(cells);
+ }
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ for (int opIndex = 0; opIndex < operations.size(); opIndex++) {
+ Pair<PTable, List<Expression>> operation = operations.get(opIndex);
+ PTable table = operation.getFirst();
+ List<Expression> expressions = operation.getSecond();
+ for (int j = 0; j < repeat; j++) { // repeater loop
+ ptr.set(rowKey);
+ PRow row = table.newRow(GenericKeyValueBuilder.INSTANCE, ts, ptr, false);
+ for (int i = 0; i < expressions.size(); i++) {
+ Expression expression = expressions.get(i);
+ ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ expression.evaluate(tuple, ptr);
+ PColumn column = table.getColumns().get(i + 1);
+ Object value = expression.getDataType().toObject(ptr, column.getSortOrder());
+ // We are guaranteed that the two column will have the
+ // same type.
+ if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
+ expression.getMaxLength(), expression.getScale(), column.getMaxLength(),
+ column.getScale())) {
+ throw new DataExceedsCapacityException(column.getDataType(), column.getMaxLength(),
+ column.getScale());
+ }
+ column.getDataType().coerceBytes(ptr, value, expression.getDataType(), expression.getMaxLength(),
+ expression.getScale(), expression.getSortOrder(),column.getMaxLength(), column.getScale(),
+ column.getSortOrder(), table.rowKeyOrderOptimizable());
+ byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ row.setValue(column, bytes);
+ }
+ List<Cell> flattenedCells = Lists.newArrayListWithExpectedSize(estimatedSize);
+ List<Mutation> mutations = row.toRowMutations();
+ for (Mutation source : mutations) {
+ flattenCells(source, flattenedCells);
+ }
+ tuple.setKeyValues(flattenedCells);
+ }
+ // Repeat only applies to first statement
+ repeat = 1;
+ }
+
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+ for (int i = 0; i < tuple.size(); i++) {
+ Cell cell = tuple.getValue(i);
+ if (Type.codeToType(cell.getTypeByte()) == Type.Put) {
+ if (put == null) {
+ put = new Put(rowKey);
+ transferAttributes(inc, put);
+ mutations.add(put);
+ }
+ put.add(cell);
+ } else {
+ if (delete == null) {
+ delete = new Delete(rowKey);
+ transferAttributes(inc, delete);
+ mutations.add(delete);
+ }
+ delete.addDeleteMarker(cell);
+ }
+ }
+ return mutations;
+ }
+
+ public static byte[] serializeOnDupKeyIgnore() {
+ return ON_DUP_KEY_IGNORE_BYTES;
+ }
+
+ /**
+ * Serialize ON DUPLICATE KEY UPDATE info with the following format:
+ * 1) Boolean value tracking whether or not to execute the first ON DUPLICATE KEY clause.
+ * We know the clause should be executed when there are other UPSERT VALUES clauses earlier in
+ * the same batch for this row key. We need this for two main cases:
+ * UPSERT VALUES followed by UPSERT VALUES ON DUPLICATE KEY UPDATE
+ * UPSERT VALUES ON DUPLICATE KEY IGNORE followed by UPSERT VALUES ON DUPLICATE KEY UPDATE
+ * 2) Short value tracking how many times the next first clause should be executed. This
+ * optimizes the same clause be executed many times by only serializing it once.
+ * 3) Repeating {List<Expression>, PTable} pairs that encapsulate the ON DUPLICATE KEY clause.
+ * @param table table representing columns being updated
+ * @param expressions list of expressions to evaluate for updating columns
+ * @return serialized byte array representation of ON DUPLICATE KEY UPDATE info
+ */
+ public static byte[] serializeOnDupKeyUpdate(PTable table, List<Expression> expressions) {
+ PTableProtos.PTable ptableProto = PTableImpl.toProto(table);
+ int size = ptableProto.getSerializedSize();
+ try (ByteArrayOutputStream stream = new ByteArrayOutputStream(size * 2)) {
+ DataOutputStream output = new DataOutputStream(stream);
+ output.writeBoolean(true); // Skip this ON DUPLICATE KEY clause if row already exists
+ output.writeShort(1); // Execute this ON DUPLICATE KEY once
+ WritableUtils.writeVInt(output, expressions.size());
+ for (int i = 0; i < expressions.size(); i++) {
+ Expression expression = expressions.get(i);
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+ expression.write(output);
+ }
+ ptableProto.writeDelimitedTo(output);
+ return stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static byte[] doNotSkipFirstOnDupKey(byte[] oldOnDupKeyBytes) {
+ byte[] newOnDupKeyBytes = Arrays.copyOf(oldOnDupKeyBytes, oldOnDupKeyBytes.length);
+ newOnDupKeyBytes[0] = 0; // false means do not skip first ON DUPLICATE KEY
+ return newOnDupKeyBytes;
+ }
+
+ public static byte[] combineOnDupKey(byte[] oldOnDupKeyBytes, byte[] newOnDupKeyBytes) {
+ // If old ON DUPLICATE KEY is null, then the new value always takes effect
+ // If new ON DUPLICATE KEY is null, then reset back to null
+ if (oldOnDupKeyBytes == null || newOnDupKeyBytes == null) {
+ if (newOnDupKeyBytes == null) {
+ return newOnDupKeyBytes;
+ }
+ return doNotSkipFirstOnDupKey(newOnDupKeyBytes);
+ }
+ // If the new UPSERT VALUES statement has an ON DUPLICATE KEY IGNORE, and there
+ // is an already existing UPSERT VALUES statement with an ON DUPLICATE KEY clause,
+ // then we can just keep that one as the new one has no impact.
+ if (isDupKeyIgnore(newOnDupKeyBytes)) {
+ return oldOnDupKeyBytes;
+ }
+ boolean isOldDupKeyIgnore = isDupKeyIgnore(oldOnDupKeyBytes);
+ try (TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(Math.max(0, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE) + newOnDupKeyBytes.length);
+ ByteArrayInputStream oldStream = new ByteArrayInputStream(oldOnDupKeyBytes);
+ ByteArrayInputStream newStream = new ByteArrayInputStream(newOnDupKeyBytes);
+ DataOutputStream output = new DataOutputStream(stream);
+ DataInputStream oldInput = new DataInputStream(oldStream);
+ DataInputStream newInput = new DataInputStream(newStream)) {
+
+ boolean execute1 = oldInput.readBoolean();
+ newInput.readBoolean(); // ignore
+ int repeating2 = newInput.readShort();
+ if (isOldDupKeyIgnore) {
+ output.writeBoolean(false); // Will force subsequent ON DUPLICATE KEY UPDATE statement to execute
+ output.writeShort(repeating2);
+ output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE);
+ } else {
+ int repeating1 = oldInput.readShort();
+ if (Bytes.compareTo(
+ oldOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE,
+ newOnDupKeyBytes, Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BOOLEAN, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE) == 0) {
+ // If both old and new ON DUPLICATE KEY UPDATE clauses match,
+ // reduce the size of data we're sending over the wire.
+ // TODO: optimization size of RPC more.
+ output.writeBoolean(execute1);
+ output.writeShort(repeating1 + repeating2);
+ output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE);
+ } else {
+ output.writeBoolean(execute1);
+ output.writeShort(repeating1); // retain first ON DUPLICATE KEY UPDATE having repeated
+ output.write(oldOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE);
+ // If the new ON DUPLICATE KEY UPDATE was repeating, we need to write it multiple times as only the first
+ // statement is effected by the repeating amount
+ for (int i = 0; i < repeating2; i++) {
+ output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE);
+ }
+ }
+ }
+ return stream.toByteArray();
+ } catch (IOException e) { // Shouldn't be possible with ByteInput/Output streams
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static boolean isDupKeyIgnore(byte[] onDupKeyBytes) {
+ return onDupKeyBytes != null && Bytes.compareTo(ON_DUP_KEY_IGNORE_BYTES, onDupKeyBytes) == 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index da7c7e8..2a7cd0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -592,8 +592,10 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
private static class ExecutableUpsertStatement extends UpsertStatement implements CompilableStatement {
- private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
- super(table, hintNode, columns, values, select, bindCount, udfParseNodes);
+ private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns,
+ List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes,
+ List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
+ super(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
}
@SuppressWarnings("unchecked")
@@ -1156,8 +1158,9 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
@Override
- public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
- return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, udfParseNodes);
+ public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount,
+ Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
+ return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 7d4e679..232a91e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -702,8 +702,11 @@ public class ParseNodeFactory {
orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, offset, bindCount, isAggregate, hasSequence, selects == null ? Collections.<SelectStatement>emptyList() : selects, udfParseNodes);
}
- public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
- return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes);
+ public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values,
+ SelectStatement select, int bindCount,
+ Map<String, UDFParseNode> udfParseNodes,
+ List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
+ return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
}
public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
index 48698bd..fca7463 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -21,20 +21,24 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hbase.util.Pair;
+
public class UpsertStatement extends DMLStatement {
private final List<ColumnName> columns;
private final List<ParseNode> values;
private final SelectStatement select;
private final HintNode hint;
+ private final List<Pair<ColumnName,ParseNode>> onDupKeyPairs;
public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns,
List<ParseNode> values, SelectStatement select, int bindCount,
- Map<String, UDFParseNode> udfParseNodes) {
+ Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
super(table, bindCount, udfParseNodes);
this.columns = columns == null ? Collections.<ColumnName>emptyList() : columns;
this.values = values;
this.select = select;
this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint;
+ this.onDupKeyPairs = onDupKeyPairs;
}
public List<ColumnName> getColumns() {
@@ -52,4 +56,8 @@ public class UpsertStatement extends DMLStatement {
public HintNode getHint() {
return hint;
}
+
+ public List<Pair<ColumnName,ParseNode>> getOnDupKeyPairs() {
+ return onDupKeyPairs;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index a60229e..62d2e3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -90,4 +90,14 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
public boolean isDynamic() {
return getDelegate().isDynamic();
}
+
+ @Override
+ public int hashCode() {
+ return getDelegate().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return getDelegate().equals(o);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 3ee012f..7d39dfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -106,13 +106,13 @@ public class DelegateTable implements PTable {
}
@Override
- public PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, byte[]... values) {
- return delegate.newRow(builder, ts, key, values);
+ public PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values) {
+ return delegate.newRow(builder, ts, key, hasOnDupKey, values);
}
@Override
- public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values) {
- return delegate.newRow(builder, key, values);
+ public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values) {
+ return delegate.newRow(builder, key, hasOnDupKey, values);
}
@Override
@@ -280,4 +280,14 @@ public class DelegateTable implements PTable {
public boolean isAppendOnlySchema() {
return delegate.isAppendOnlySchema();
}
+
+ @Override
+ public int hashCode() {
+ return delegate.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return delegate.equals(obj);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index a556f76..ca827d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -170,14 +170,14 @@ public class PColumnImpl implements PColumn {
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
- if (getClass() != obj.getClass()) return false;
- PColumnImpl other = (PColumnImpl)obj;
+ if (! (obj instanceof PColumn) ) return false;
+ PColumn other = (PColumn)obj;
if (familyName == null) {
- if (other.familyName != null) return false;
- } else if (!familyName.equals(other.familyName)) return false;
+ if (other.getFamilyName() != null) return false;
+ } else if (!familyName.equals(other.getFamilyName())) return false;
if (name == null) {
- if (other.name != null) return false;
- } else if (!name.equals(other.name)) return false;
+ if (other.getName() != null) return false;
+ } else if (!name.equals(other.getName())) return false;
return true;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
index 30deee6..fde83ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
@@ -40,7 +40,7 @@ public interface PRow {
/**
* Get the list of {@link org.apache.hadoop.hbase.client.Mutation} used to
* update an HTable after all mutations through calls to
- * {@link #setValue(PColumn, Object)} or {@link #delete()}.
+ * {@link #setValue(PColumn, byte[])} or {@link #delete()}.
* @return the list of mutations representing all changes made to a row
* @throws ConstraintViolationException if row data violates schema
* constraint
@@ -54,15 +54,6 @@ public interface PRow {
* @throws ConstraintViolationException if row data violates schema
* constraint
*/
- public void setValue(PColumn col, Object value);
-
- /**
- * Set a column value in the row
- * @param col the column for which the value is being set
- * @param value the value
- * @throws ConstraintViolationException if row data violates schema
- * constraint
- */
public void setValue(PColumn col, byte[] value);
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index b585323..01e8afe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -226,26 +226,28 @@ public interface PTable extends PMetaDataEntity {
* and the optional key values specified using values.
* @param ts the timestamp that the key value will have when committed
* @param key the row key of the key value
+ * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise.
* @param values the optional key values
* @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
* generate the Row to send to the HBase server.
* @throws ConstraintViolationException if row data violates schema
* constraint
*/
- PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, byte[]... values);
+ PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values);
/**
* Creates a new row for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])}
* and the optional key values specified using values. The timestamp of the key value
* will be set by the HBase server.
* @param key the row key of the key value
+ * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise.
* @param values the optional key values
* @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
* generate the row to send to the HBase server.
* @throws ConstraintViolationException if row data violates schema
* constraint
*/
- PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values);
+ PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values);
/**
* Formulates a row key using the values provided. The values must be in
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 773ce76..627740b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -241,7 +242,7 @@ public class PTableImpl implements PTable {
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
}
- public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
+ public static PTableImpl makePTable(PTable table, Collection<PColumn> columns) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(),
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
@@ -251,7 +252,7 @@ public class PTableImpl implements PTable {
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
}
- public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
+ public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
@@ -261,7 +262,7 @@ public class PTableImpl implements PTable {
table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
}
- public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
+ public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
@@ -271,7 +272,7 @@ public class PTableImpl implements PTable {
table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
}
- public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
+ public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
@@ -715,8 +716,8 @@ public class PTableImpl implements PTable {
}
}
- private PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, int i, byte[]... values) {
- PRow row = new PRowImpl(builder, key, ts, getBucketNum());
+ private PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, int i, boolean hasOnDupKey, byte[]... values) {
+ PRow row = new PRowImpl(builder, key, ts, getBucketNum(), hasOnDupKey);
if (i < values.length) {
for (PColumnFamily family : getColumnFamilies()) {
for (PColumn column : family.getColumns()) {
@@ -731,13 +732,13 @@ public class PTableImpl implements PTable {
@Override
public PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key,
- byte[]... values) {
- return newRow(builder, ts, key, 0, values);
+ boolean hasOnDupKey, byte[]... values) {
+ return newRow(builder, ts, key, 0, hasOnDupKey, values);
}
@Override
- public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values) {
- return newRow(builder, HConstants.LATEST_TIMESTAMP, key, values);
+ public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values) {
+ return newRow(builder, HConstants.LATEST_TIMESTAMP, key, hasOnDupKey, values);
}
@Override
@@ -775,14 +776,16 @@ public class PTableImpl implements PTable {
// default to the generic builder, and only override when we know on the client
private final KeyValueBuilder kvBuilder;
- private Put setValues;
+ private Mutation setValues;
private Delete unsetValues;
private Mutation deleteRow;
private final long ts;
+ private final boolean hasOnDupKey;
- public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum) {
+ public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum, boolean hasOnDupKey) {
this.kvBuilder = kvBuilder;
this.ts = ts;
+ this.hasOnDupKey = hasOnDupKey;
if (bucketNum != null) {
this.key = SaltingUtil.getSaltedKey(key, bucketNum);
this.keyPtr = new ImmutableBytesPtr(this.key);
@@ -795,7 +798,7 @@ public class PTableImpl implements PTable {
}
private void newMutations() {
- Put put = new Put(this.key);
+ Mutation put = this.hasOnDupKey ? new Increment(this.key) : new Put(this.key);
Delete delete = new Delete(this.key);
if (isWALDisabled()) {
put.setDurability(Durability.SKIP_WAL);
@@ -844,12 +847,6 @@ public class PTableImpl implements PTable {
}
@Override
- public void setValue(PColumn column, Object value) {
- byte[] byteValue = value == null ? ByteUtil.EMPTY_BYTE_ARRAY : column.getDataType().toBytes(value);
- setValue(column, byteValue);
- }
-
- @Override
public void setValue(PColumn column, byte[] byteValue) {
deleteRow = null;
byte[] family = column.getFamilyName().getBytes();
@@ -864,7 +861,10 @@ public class PTableImpl implements PTable {
// Store nulls for immutable tables otherwise default value would be used
removeIfPresent(setValues, family, qualifier);
removeIfPresent(unsetValues, family, qualifier);
- } else if (isNull && !getStoreNulls() && column.getExpressionStr() == null) {
+ } else if (isNull && !getStoreNulls() && !this.hasOnDupKey && column.getExpressionStr() == null) {
+ // Cannot use column delete marker when row has ON DUPLICATE KEY clause
+ // because we cannot change a Delete mutation to a Put mutation in the
+ // case of updates occurring due to the execution of the clause.
removeIfPresent(setValues, family, qualifier);
deleteQuietly(unsetValues, kvBuilder, kvBuilder.buildDeleteColumns(keyPtr, column
.getFamilyName().getBytesPtr(), column.getName().getBytesPtr(), ts));
@@ -1328,11 +1328,11 @@ public class PTableImpl implements PTable {
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
- if (getClass() != obj.getClass()) return false;
- PTableImpl other = (PTableImpl) obj;
+ if (! (obj instanceof PTable)) return false;
+ PTable other = (PTable) obj;
if (key == null) {
- if (other.key != null) return false;
- } else if (!key.equals(other.key)) return false;
+ if (other.getKey() != null) return false;
+ } else if (!key.equals(other.getKey())) return false;
return true;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
index 6f8b19f..65cf075 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
@@ -18,7 +18,6 @@ import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.schema.types.PDataType;
public class ExpressionUtil {
-
private ExpressionUtil() {
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 393da4c..7488c72 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -458,10 +458,6 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
return plan.getContext().getScan();
}
- private QueryPlan getQueryPlan(String query) throws SQLException {
- return getQueryPlan(query, Collections.emptyList());
- }
-
private QueryPlan getOptimizedQueryPlan(String query) throws SQLException {
return getOptimizedQueryPlan(query, Collections.emptyList());
}
@@ -2683,4 +2679,104 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
assertEquals("PLATFORM_ENTITY.GLOBAL_INDEX", plan.getContext().getCurrentTable().getTable().getName().getString());
}
}
+
+ @Test
+ public void testOnDupKeyForImmutableTable() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ try {
+ conn.createStatement().execute("CREATE TABLE t1 (k integer not null primary key, v bigint) IMMUTABLE_ROWS=true");
+ conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE.getErrorCode(), e.getErrorCode());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testUpdatePKOnDupKey() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ try {
+ conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+ conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE k2 = v + 1");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY.getErrorCode(), e.getErrorCode());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testOnDupKeyTypeMismatch() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ try {
+ conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v1 bigint, v2 varchar, constraint pk primary key (k1,k2))");
+ conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v1 = v2 || 'a'");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TYPE_MISMATCH.getErrorCode(), e.getErrorCode());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testDuplicateColumnOnDupKeyUpdate() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ try {
+ conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v1 bigint, v2 bigint, constraint pk primary key (k1,k2))");
+ conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v1 = v1 + 1, v1 = v2 + 2");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY.getErrorCode(), e.getErrorCode());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testAggregationInOnDupKey() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+ try {
+ conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = sum(v)");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY.getErrorCode(), e.getErrorCode());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testSequenceInOnDupKey() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+ conn.createStatement().execute("CREATE SEQUENCE s1");
+ try {
+ conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = next value for s1");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.INVALID_USE_OF_NEXT_VALUE_FOR.getErrorCode(), e.getErrorCode());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testSCNInOnDupKey() throws Exception {
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=100";
+ Connection conn = DriverManager.getConnection(url);
+ conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+ try {
+ conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY.getErrorCode(), e.getErrorCode());
+ } finally {
+ conn.close();
+ }
+ }
}