You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/10/27 21:00:16 UTC

[1/4] phoenix git commit: PHOENIX-6 Support ON DUPLICATE KEY construct

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 5baac0be4 -> 837d114b2


http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index d121d2d..d6adc71 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -17,20 +17,77 @@
  */
 package org.apache.phoenix.index;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import com.google.common.collect.Lists;
 
 /**
  * Index builder for covered-columns index that ties into phoenix for faster use.
  */
 public class PhoenixIndexBuilder extends NonTxIndexBuilder {
+    public static final String ATOMIC_OP_ATTRIB = "_ATOMIC_OP_ATTRIB";
+    private static final byte[] ON_DUP_KEY_IGNORE_BYTES = new byte[] {1}; // boolean true
+    private static final int ON_DUP_KEY_HEADER_BYTE_SIZE = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BOOLEAN;
+    
 
+    private static List<Cell> flattenCells(Mutation m, int estimatedSize) throws IOException {
+        List<Cell> flattenedCells = Lists.newArrayListWithExpectedSize(estimatedSize);
+        flattenCells(m, flattenedCells);
+        return flattenedCells;
+    }
+    
+    private static void flattenCells(Mutation m, List<Cell> flattenedCells) throws IOException {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            flattenedCells.addAll(cells);
+        }
+    }
+    
     @Override
     public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
         return new PhoenixIndexMetaData(env, miniBatchOp.getOperation(0).getAttributesMap());
@@ -53,4 +110,266 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
     @Override
     public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException {
     }
+    
+    @Override
+    public boolean isAtomicOp(Mutation m) throws IOException {
+        return m.getAttribute(ATOMIC_OP_ATTRIB) != null;
+    }
+
+    private static void transferCells(Mutation source, Mutation target) {
+        target.getFamilyCellMap().putAll(source.getFamilyCellMap());
+    }
+    private static void transferAttributes(Mutation source, Mutation target) {
+        for (Map.Entry<String, byte[]> entry : source.getAttributesMap().entrySet()) {
+            target.setAttribute(entry.getKey(), entry.getValue());
+        }
+    }
+    private static List<Mutation> convertIncrementToPutInSingletonList(Increment inc) {
+        byte[] rowKey = inc.getRow();
+        Put put = new Put(rowKey);
+        transferCells(inc, put);
+        transferAttributes(inc, put);
+        return Collections.<Mutation>singletonList(put);
+    }
+    
+    @Override
+    public List<Mutation> executeAtomicOp(Increment inc) throws IOException {
+        byte[] opBytes = inc.getAttribute(ATOMIC_OP_ATTRIB);
+        if (opBytes == null) { // Unexpected
+            return null;
+        }
+        inc.setAttribute(ATOMIC_OP_ATTRIB, null);
+        Put put = null;
+        Delete delete = null;
+        // We cannot neither use the time stamp in the Increment to set the Get time range
+        // nor set the Put/Delete time stamp and have this be atomic as HBase does not
+        // handle that. Though we disallow using ON DUPLICATE KEY clause when the
+        // CURRENT_SCN is set, we still may have a time stamp set as of when the table
+        // was resolved on the client side. We need to ignore this as well due to limitations
+        // in HBase, but this isn't too bad as the time will be very close the the current
+        // time anyway.
+        long ts = HConstants.LATEST_TIMESTAMP;
+        byte[] rowKey = inc.getRow();
+        final Get get = new Get(rowKey);
+        if (isDupKeyIgnore(opBytes)) {
+            get.setFilter(new FirstKeyOnlyFilter());
+            Result result = this.env.getRegion().get(get);
+            return result.isEmpty() ? convertIncrementToPutInSingletonList(inc) : Collections.<Mutation>emptyList();
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(opBytes);
+        DataInputStream input = new DataInputStream(stream);
+        boolean skipFirstOp = input.readBoolean();
+        short repeat = input.readShort();
+        final int[] estimatedSizeHolder = {0};
+        List<Pair<PTable, List<Expression>>> operations = Lists.newArrayListWithExpectedSize(3);
+        while (true) {
+            ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() {
+                @Override
+                public Void visit(KeyValueColumnExpression expression) {
+                    get.addColumn(expression.getColumnFamily(), expression.getColumnName());
+                    estimatedSizeHolder[0]++;
+                    return null;
+                }
+            };
+            try {
+                int nExpressions = WritableUtils.readVInt(input);
+                List<Expression>expressions = Lists.newArrayListWithExpectedSize(nExpressions);
+                for (int i = 0; i < nExpressions; i++) {
+                    Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+                    expression.readFields(input);
+                    expressions.add(expression);
+                    expression.accept(visitor);                    
+                }
+                PTableProtos.PTable tableProto = PTableProtos.PTable.parseDelimitedFrom(input);
+                PTable table = PTableImpl.createFromProto(tableProto);
+                operations.add(new Pair<>(table, expressions));
+            } catch (EOFException e) {
+                break;
+            }
+        }
+        int estimatedSize = estimatedSizeHolder[0];
+        if (get.getFamilyMap().isEmpty()) {
+            get.setFilter(new FirstKeyOnlyFilter());
+        }
+        MultiKeyValueTuple tuple;
+        List<Cell>cells = ((HRegion)this.env.getRegion()).get(get, false);
+        if (cells.isEmpty()) {
+            if (skipFirstOp) {
+                if (operations.size() <= 1 && repeat <= 1) {
+                    return convertIncrementToPutInSingletonList(inc);
+                }
+                repeat--; // Skip first operation (if first wasn't ON DUPLICATE KEY IGNORE)
+            }
+            // Base current state off of new row
+            tuple = new MultiKeyValueTuple(flattenCells(inc, estimatedSize));
+        } else {
+            // Base current state off of existing row
+            tuple = new MultiKeyValueTuple(cells);
+        }
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        for (int opIndex = 0; opIndex < operations.size(); opIndex++) {
+            Pair<PTable, List<Expression>> operation = operations.get(opIndex);
+            PTable table = operation.getFirst();
+            List<Expression> expressions = operation.getSecond();
+            for (int j = 0; j < repeat; j++) { // repeater loop
+                ptr.set(rowKey);
+                PRow row = table.newRow(GenericKeyValueBuilder.INSTANCE, ts, ptr, false);
+                for (int i = 0; i < expressions.size(); i++) {
+                    Expression expression = expressions.get(i);
+                    ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                    expression.evaluate(tuple, ptr);
+                    PColumn column = table.getColumns().get(i + 1);
+                    Object value = expression.getDataType().toObject(ptr, column.getSortOrder());
+                    // We are guaranteed that the two column will have the
+                    // same type.
+                    if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
+                            expression.getMaxLength(), expression.getScale(), column.getMaxLength(),
+                            column.getScale())) {
+                        throw new DataExceedsCapacityException(column.getDataType(), column.getMaxLength(),
+                            column.getScale());
+                    }
+                    column.getDataType().coerceBytes(ptr, value, expression.getDataType(), expression.getMaxLength(),
+                        expression.getScale(), expression.getSortOrder(),column.getMaxLength(), column.getScale(),
+                        column.getSortOrder(), table.rowKeyOrderOptimizable());
+                    byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                    row.setValue(column, bytes);
+                }
+                List<Cell> flattenedCells = Lists.newArrayListWithExpectedSize(estimatedSize);
+                List<Mutation> mutations = row.toRowMutations();
+                for (Mutation source : mutations) {
+                    flattenCells(source, flattenedCells);
+                }
+                tuple.setKeyValues(flattenedCells);
+            }
+            // Repeat only applies to first statement
+            repeat = 1;
+        }
+        
+        List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+        for (int i = 0; i < tuple.size(); i++) {
+            Cell cell = tuple.getValue(i);
+            if (Type.codeToType(cell.getTypeByte()) == Type.Put) {
+                if (put == null) {
+                    put = new Put(rowKey);
+                    transferAttributes(inc, put);
+                    mutations.add(put);
+                }
+                put.add(cell);
+            } else {
+                if (delete == null) {
+                    delete = new Delete(rowKey);
+                    transferAttributes(inc, delete);
+                    mutations.add(delete);
+                }
+                delete.addDeleteMarker(cell);
+            }
+        }
+        return mutations;
+    }
+
+    public static byte[] serializeOnDupKeyIgnore() {
+        return ON_DUP_KEY_IGNORE_BYTES;
+    }
+    
+    /**
+     * Serialize ON DUPLICATE KEY UPDATE info with the following format:
+     * 1) Boolean value tracking whether or not to execute the first ON DUPLICATE KEY clause.
+     *    We know the clause should be executed when there are other UPSERT VALUES clauses earlier in
+     *    the same batch for this row key. We need this for two main cases: 
+     *       UPSERT VALUES followed by UPSERT VALUES ON DUPLICATE KEY UPDATE
+     *       UPSERT VALUES ON DUPLICATE KEY IGNORE followed by UPSERT VALUES ON DUPLICATE KEY UPDATE
+     * 2) Short value tracking how many times the next first clause should be executed. This
+     *    optimizes the same clause be executed many times by only serializing it once.
+     * 3) Repeating {List<Expression>, PTable} pairs that encapsulate the ON DUPLICATE KEY clause.
+     * @param table table representing columns being updated
+     * @param expressions list of expressions to evaluate for updating columns
+     * @return serialized byte array representation of ON DUPLICATE KEY UPDATE info
+     */
+    public static byte[] serializeOnDupKeyUpdate(PTable table, List<Expression> expressions) {
+        PTableProtos.PTable ptableProto = PTableImpl.toProto(table);
+        int size = ptableProto.getSerializedSize();
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream(size * 2)) {
+            DataOutputStream output = new DataOutputStream(stream);
+            output.writeBoolean(true); // Skip this ON DUPLICATE KEY clause if row already exists
+            output.writeShort(1); // Execute this ON DUPLICATE KEY once
+            WritableUtils.writeVInt(output, expressions.size());
+            for (int i = 0; i < expressions.size(); i++) {
+                Expression expression = expressions.get(i);
+                WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+                expression.write(output);
+            }
+            ptableProto.writeDelimitedTo(output);
+            return stream.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    private static byte[] doNotSkipFirstOnDupKey(byte[] oldOnDupKeyBytes) {
+        byte[] newOnDupKeyBytes = Arrays.copyOf(oldOnDupKeyBytes, oldOnDupKeyBytes.length);
+        newOnDupKeyBytes[0] = 0; // false means do not skip first ON DUPLICATE KEY
+        return newOnDupKeyBytes;
+    }
+
+    public static byte[] combineOnDupKey(byte[] oldOnDupKeyBytes, byte[] newOnDupKeyBytes) {
+        // If old ON DUPLICATE KEY is null, then the new value always takes effect
+        // If new ON DUPLICATE KEY is null, then reset back to null
+        if (oldOnDupKeyBytes == null || newOnDupKeyBytes == null) {
+            if (newOnDupKeyBytes == null) {
+                return newOnDupKeyBytes;
+            }
+            return doNotSkipFirstOnDupKey(newOnDupKeyBytes);
+        }
+        // If the new UPSERT VALUES statement has an ON DUPLICATE KEY IGNORE, and there
+        // is an already existing UPSERT VALUES statement with an ON DUPLICATE KEY clause,
+        // then we can just keep that one as the new one has no impact.
+        if (isDupKeyIgnore(newOnDupKeyBytes)) {
+            return oldOnDupKeyBytes;
+        }
+        boolean isOldDupKeyIgnore = isDupKeyIgnore(oldOnDupKeyBytes);
+        try (TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(Math.max(0, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE) + newOnDupKeyBytes.length); 
+                ByteArrayInputStream oldStream = new ByteArrayInputStream(oldOnDupKeyBytes); 
+                ByteArrayInputStream newStream = new ByteArrayInputStream(newOnDupKeyBytes);
+                DataOutputStream output = new DataOutputStream(stream);
+                DataInputStream oldInput = new DataInputStream(oldStream);
+                DataInputStream newInput = new DataInputStream(newStream)) {
+            
+            boolean execute1 = oldInput.readBoolean();
+            newInput.readBoolean(); // ignore
+            int repeating2 = newInput.readShort();
+            if (isOldDupKeyIgnore) {
+                output.writeBoolean(false); // Will force subsequent ON DUPLICATE KEY UPDATE statement to execute
+                output.writeShort(repeating2);
+                output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE);
+            } else {
+                int repeating1 = oldInput.readShort();
+                if (Bytes.compareTo(
+                    oldOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE, 
+                    newOnDupKeyBytes, Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BOOLEAN, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE) == 0) {
+                // If both old and new ON DUPLICATE KEY UPDATE clauses match,
+                // reduce the size of data we're sending over the wire.
+                // TODO: optimization size of RPC more.
+                output.writeBoolean(execute1);
+                output.writeShort(repeating1 + repeating2);
+                output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE);
+                } else {
+                    output.writeBoolean(execute1);
+                    output.writeShort(repeating1); // retain first ON DUPLICATE KEY UPDATE having repeated
+                    output.write(oldOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, oldOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE);
+                    // If the new ON DUPLICATE KEY UPDATE was repeating, we need to write it multiple times as only the first
+                    // statement is effected by the repeating amount
+                    for (int i = 0; i < repeating2; i++) {
+                        output.write(newOnDupKeyBytes, ON_DUP_KEY_HEADER_BYTE_SIZE, newOnDupKeyBytes.length - ON_DUP_KEY_HEADER_BYTE_SIZE);
+                    }
+                }
+            }
+            return stream.toByteArray();
+        } catch (IOException e) { // Shouldn't be possible with ByteInput/Output streams
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static boolean isDupKeyIgnore(byte[] onDupKeyBytes) {
+        return onDupKeyBytes != null && Bytes.compareTo(ON_DUP_KEY_IGNORE_BYTES, onDupKeyBytes) == 0;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 60e32e5..d562f44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -595,8 +595,10 @@ public class PhoenixStatement implements Statement, SQLCloseable {
     }
 
     private static class ExecutableUpsertStatement extends UpsertStatement implements CompilableStatement {
-        private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
-            super(table, hintNode, columns, values, select, bindCount, udfParseNodes);
+        private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns,
+                List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes,
+                List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
+            super(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
         }
 
         @SuppressWarnings("unchecked")
@@ -1203,8 +1205,9 @@ public class PhoenixStatement implements Statement, SQLCloseable {
         }
 
         @Override
-        public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
-            return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, udfParseNodes);
+        public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, 
+                Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
+            return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
         }
         
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 6b58bed..977ca4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -706,8 +706,11 @@ public class ParseNodeFactory {
                 orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, offset, bindCount, isAggregate, hasSequence, selects == null ? Collections.<SelectStatement>emptyList() : selects, udfParseNodes);
     } 
     
-    public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
-        return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes);
+    public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values,
+            SelectStatement select, int bindCount, 
+            Map<String, UDFParseNode> udfParseNodes,
+            List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
+        return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
     }
 
     public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
index 48698bd..fca7463 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -21,20 +21,24 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.util.Pair;
+
 public class UpsertStatement extends DMLStatement {
     private final List<ColumnName> columns;
     private final List<ParseNode> values;
     private final SelectStatement select;
     private final HintNode hint;
+    private final List<Pair<ColumnName,ParseNode>> onDupKeyPairs;
 
     public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns,
             List<ParseNode> values, SelectStatement select, int bindCount,
-            Map<String, UDFParseNode> udfParseNodes) {
+            Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
         super(table, bindCount, udfParseNodes);
         this.columns = columns == null ? Collections.<ColumnName>emptyList() : columns;
         this.values = values;
         this.select = select;
         this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint;
+        this.onDupKeyPairs = onDupKeyPairs;
     }
 
     public List<ColumnName> getColumns() {
@@ -52,4 +56,8 @@ public class UpsertStatement extends DMLStatement {
     public HintNode getHint() {
         return hint;
     }
+
+    public List<Pair<ColumnName,ParseNode>> getOnDupKeyPairs() {
+        return onDupKeyPairs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index 798706e..aca8219 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -89,4 +89,14 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
 	public boolean isDynamic() {
 		return getDelegate().isDynamic();
 	}
+
+	@Override
+	public int hashCode() {
+	    return getDelegate().hashCode();
+	}
+	
+	@Override
+    public boolean equals(Object o) {
+	    return getDelegate().equals(o);
+	}
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 3ee012f..7d39dfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -106,13 +106,13 @@ public class DelegateTable implements PTable {
     }
 
     @Override
-    public PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, byte[]... values) {
-        return delegate.newRow(builder, ts, key, values);
+    public PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values) {
+        return delegate.newRow(builder, ts, key, hasOnDupKey, values);
     }
 
     @Override
-    public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values) {
-        return delegate.newRow(builder, key, values);
+    public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values) {
+        return delegate.newRow(builder, key, hasOnDupKey, values);
     }
 
     @Override
@@ -280,4 +280,14 @@ public class DelegateTable implements PTable {
     public boolean isAppendOnlySchema() {
         return delegate.isAppendOnlySchema();
     }
+    
+    @Override
+    public int hashCode() {
+        return delegate.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return delegate.equals(obj);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index a556f76..ca827d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -170,14 +170,14 @@ public class PColumnImpl implements PColumn {
     public boolean equals(Object obj) {
         if (this == obj) return true;
         if (obj == null) return false;
-        if (getClass() != obj.getClass()) return false;
-        PColumnImpl other = (PColumnImpl)obj;
+        if (! (obj instanceof PColumn) ) return false;
+        PColumn other = (PColumn)obj;
         if (familyName == null) {
-            if (other.familyName != null) return false;
-        } else if (!familyName.equals(other.familyName)) return false;
+            if (other.getFamilyName() != null) return false;
+        } else if (!familyName.equals(other.getFamilyName())) return false;
         if (name == null) {
-            if (other.name != null) return false;
-        } else if (!name.equals(other.name)) return false;
+            if (other.getName() != null) return false;
+        } else if (!name.equals(other.getName())) return false;
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
index 30deee6..fde83ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
@@ -40,7 +40,7 @@ public interface PRow {
     /**
      * Get the list of {@link org.apache.hadoop.hbase.client.Mutation} used to
      * update an HTable after all mutations through calls to
-     * {@link #setValue(PColumn, Object)} or {@link #delete()}.
+     * {@link #setValue(PColumn, byte[])} or {@link #delete()}.
      * @return the list of mutations representing all changes made to a row
      * @throws ConstraintViolationException if row data violates schema
      * constraint
@@ -54,15 +54,6 @@ public interface PRow {
      * @throws ConstraintViolationException if row data violates schema
      * constraint
      */
-    public void setValue(PColumn col, Object value);
-    
-    /**
-     * Set a column value in the row
-     * @param col the column for which the value is being set
-     * @param value the value
-     * @throws ConstraintViolationException if row data violates schema
-     * constraint
-     */
     public void setValue(PColumn col, byte[] value);
     
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index b585323..01e8afe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -226,26 +226,28 @@ public interface PTable extends PMetaDataEntity {
      * and the optional key values specified using values.
      * @param ts the timestamp that the key value will have when committed
      * @param key the row key of the key value
+     * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise.
      * @param values the optional key values
      * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
      * generate the Row to send to the HBase server.
      * @throws ConstraintViolationException if row data violates schema
      * constraint
      */
-    PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, byte[]... values);
+    PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values);
 
     /**
      * Creates a new row for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])}
      * and the optional key values specified using values. The timestamp of the key value
      * will be set by the HBase server.
      * @param key the row key of the key value
+     * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise.
      * @param values the optional key values
      * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
      * generate the row to send to the HBase server.
      * @throws ConstraintViolationException if row data violates schema
      * constraint
      */
-    PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values);
+    PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values);
 
     /**
      * Formulates a row key using the values provided. The values must be in

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 773ce76..627740b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -241,7 +242,7 @@ public class PTableImpl implements PTable {
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
 
-    public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
+    public static PTableImpl makePTable(PTable table, Collection<PColumn> columns) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(),
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
@@ -251,7 +252,7 @@ public class PTableImpl implements PTable {
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
 
-    public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
+    public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
@@ -261,7 +262,7 @@ public class PTableImpl implements PTable {
                 table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
 
-    public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
+    public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
@@ -271,7 +272,7 @@ public class PTableImpl implements PTable {
                 table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
     
-    public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
+    public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
             boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
@@ -715,8 +716,8 @@ public class PTableImpl implements PTable {
         }
     }
 
-    private PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, int i, byte[]... values) {
-        PRow row = new PRowImpl(builder, key, ts, getBucketNum());
+    private PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, int i, boolean hasOnDupKey, byte[]... values) {
+        PRow row = new PRowImpl(builder, key, ts, getBucketNum(), hasOnDupKey);
         if (i < values.length) {
             for (PColumnFamily family : getColumnFamilies()) {
                 for (PColumn column : family.getColumns()) {
@@ -731,13 +732,13 @@ public class PTableImpl implements PTable {
 
     @Override
     public PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key,
-            byte[]... values) {
-        return newRow(builder, ts, key, 0, values);
+            boolean hasOnDupKey, byte[]... values) {
+        return newRow(builder, ts, key, 0, hasOnDupKey, values);
     }
 
     @Override
-    public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values) {
-        return newRow(builder, HConstants.LATEST_TIMESTAMP, key, values);
+    public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values) {
+        return newRow(builder, HConstants.LATEST_TIMESTAMP, key, hasOnDupKey, values);
     }
 
     @Override
@@ -775,14 +776,16 @@ public class PTableImpl implements PTable {
         // default to the generic builder, and only override when we know on the client
         private final KeyValueBuilder kvBuilder;
 
-        private Put setValues;
+        private Mutation setValues;
         private Delete unsetValues;
         private Mutation deleteRow;
         private final long ts;
+        private final boolean hasOnDupKey;
 
-        public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum) {
+        public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum, boolean hasOnDupKey) {
             this.kvBuilder = kvBuilder;
             this.ts = ts;
+            this.hasOnDupKey = hasOnDupKey;
             if (bucketNum != null) {
                 this.key = SaltingUtil.getSaltedKey(key, bucketNum);
                 this.keyPtr = new ImmutableBytesPtr(this.key);
@@ -795,7 +798,7 @@ public class PTableImpl implements PTable {
         }
 
         private void newMutations() {
-            Put put = new Put(this.key);
+            Mutation put = this.hasOnDupKey ? new Increment(this.key) : new Put(this.key);
             Delete delete = new Delete(this.key);
             if (isWALDisabled()) {
                 put.setDurability(Durability.SKIP_WAL);
@@ -844,12 +847,6 @@ public class PTableImpl implements PTable {
         }
 
         @Override
-        public void setValue(PColumn column, Object value) {
-            byte[] byteValue = value == null ? ByteUtil.EMPTY_BYTE_ARRAY : column.getDataType().toBytes(value);
-            setValue(column, byteValue);
-        }
-
-        @Override
         public void setValue(PColumn column, byte[] byteValue) {
             deleteRow = null;
             byte[] family = column.getFamilyName().getBytes();
@@ -864,7 +861,10 @@ public class PTableImpl implements PTable {
                 // Store nulls for immutable tables otherwise default value would be used
                 removeIfPresent(setValues, family, qualifier);
                 removeIfPresent(unsetValues, family, qualifier);
-            } else if (isNull && !getStoreNulls() && column.getExpressionStr() == null) {
+            } else if (isNull && !getStoreNulls() && !this.hasOnDupKey && column.getExpressionStr() == null) {
+                // Cannot use column delete marker when row has ON DUPLICATE KEY clause
+                // because we cannot change a Delete mutation to a Put mutation in the
+                // case of updates occurring due to the execution of the clause.
                 removeIfPresent(setValues, family, qualifier);
                 deleteQuietly(unsetValues, kvBuilder, kvBuilder.buildDeleteColumns(keyPtr, column
                             .getFamilyName().getBytesPtr(), column.getName().getBytesPtr(), ts));
@@ -1328,11 +1328,11 @@ public class PTableImpl implements PTable {
     public boolean equals(Object obj) {
         if (this == obj) return true;
         if (obj == null) return false;
-        if (getClass() != obj.getClass()) return false;
-        PTableImpl other = (PTableImpl) obj;
+        if (! (obj instanceof PTable)) return false;
+        PTable other = (PTable) obj;
         if (key == null) {
-            if (other.key != null) return false;
-        } else if (!key.equals(other.key)) return false;
+            if (other.getKey() != null) return false;
+        } else if (!key.equals(other.getKey())) return false;
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
index 6f8b19f..65cf075 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
@@ -18,7 +18,6 @@ import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.schema.types.PDataType;
 
 public class ExpressionUtil {
-
 	private ExpressionUtil() {
 	}
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 393da4c..7488c72 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -458,10 +458,6 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
         return plan.getContext().getScan();
     }
     
-    private QueryPlan getQueryPlan(String query) throws SQLException {
-        return getQueryPlan(query, Collections.emptyList());
-    }
-
     private QueryPlan getOptimizedQueryPlan(String query) throws SQLException {
         return getOptimizedQueryPlan(query, Collections.emptyList());
     }
@@ -2683,4 +2679,104 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
             assertEquals("PLATFORM_ENTITY.GLOBAL_INDEX", plan.getContext().getCurrentTable().getTable().getName().getString());
         }
     }
+
+    @Test
+    public void testOnDupKeyForImmutableTable() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute("CREATE TABLE t1 (k integer not null primary key, v bigint) IMMUTABLE_ROWS=true");
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE.getErrorCode(), e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testUpdatePKOnDupKey() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE k2 = v + 1");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY.getErrorCode(), e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testOnDupKeyTypeMismatch() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v1 bigint, v2 varchar, constraint pk primary key (k1,k2))");
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v1 = v2 || 'a'");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TYPE_MISMATCH.getErrorCode(), e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testDuplicateColumnOnDupKeyUpdate() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v1 bigint, v2 bigint, constraint pk primary key (k1,k2))");
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v1 = v1 + 1, v1 = v2 + 2");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY.getErrorCode(), e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testAggregationInOnDupKey() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+        try {
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = sum(v)");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY.getErrorCode(), e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSequenceInOnDupKey() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+        conn.createStatement().execute("CREATE SEQUENCE s1");
+        try {
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = next value for s1");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.INVALID_USE_OF_NEXT_VALUE_FOR.getErrorCode(), e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSCNInOnDupKey() throws Exception {
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=100";
+        Connection conn = DriverManager.getConnection(url);
+        conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))");
+        try {
+            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY.getErrorCode(), e.getErrorCode());
+        } finally {
+            conn.close();
+        }
+    }
 }


[3/4] phoenix git commit: PHOENIX-3420 Upgrade to sqlline 1.2.0

Posted by ja...@apache.org.
PHOENIX-3420 Upgrade to sqlline 1.2.0


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0bc6f6d6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0bc6f6d6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0bc6f6d6

Branch: refs/heads/4.x-HBase-1.1
Commit: 0bc6f6d66144f7f842e396ad149ba5ecbd33bdba
Parents: f140100
Author: James Taylor <ja...@apache.org>
Authored: Thu Oct 27 13:08:32 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu Oct 27 13:59:12 2016 -0700

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc6f6d6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 400975d..231a0e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,7 +82,7 @@
     <commons-lang.version>2.5</commons-lang.version>
     <commons-logging.version>1.2</commons-logging.version>
     <commons-csv.version>1.0</commons-csv.version>
-    <sqlline.version>1.1.9</sqlline.version>
+    <sqlline.version>1.2.0</sqlline.version>
     <guava.version>13.0.1</guava.version>
     <flume.version>1.4.0</flume.version>
     <findbugs-annotations.version>1.3.9-1</findbugs-annotations.version>


[4/4] phoenix git commit: PHOENIX-3267 Replace use of SELECT null with CAST(null AS ) (Eric Lomore)

Posted by ja...@apache.org.
PHOENIX-3267 Replace use of SELECT null with CAST(null AS <type>) (Eric Lomore)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f1401007
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f1401007
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f1401007

Branch: refs/heads/4.x-HBase-1.1
Commit: f1401007e0edf1084c00196e73225848cc4a56bc
Parents: 5baac0b
Author: James Taylor <ja...@apache.org>
Authored: Thu Oct 27 11:48:02 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu Oct 27 13:59:12 2016 -0700

----------------------------------------------------------------------
 .../src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java | 2 +-
 .../src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java   | 2 +-
 .../it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java | 2 +-
 .../org/apache/phoenix/jdbc/PhoenixResultSetMetadataTest.java    | 4 ++--
 4 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f1401007/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java
index 01cc2c1..c689373 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java
@@ -77,7 +77,7 @@ public class AggregateQueryIT extends BaseQueryIT {
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 70));
         conn = DriverManager.getConnection(getUrl(), props);
         conn.setAutoCommit(true);
-        conn.createStatement().execute("UPSERT INTO atable(organization_id,entity_id,a_integer) SELECT organization_id, entity_id, null FROM atable");
+        conn.createStatement().execute("UPSERT INTO atable(organization_id,entity_id,a_integer) SELECT organization_id, entity_id, CAST(null AS integer) FROM atable");
 
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 90));
         conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f1401007/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 8c9c8eb..3561274 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -681,7 +681,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
 
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
         conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("upsert into phoenix_test (id, ts) select id, null from phoenix_test where id <= 'bbb' limit 1");
+        conn.createStatement().execute("upsert into phoenix_test (id, ts) select id, CAST(null AS timestamp) from phoenix_test where id <= 'bbb' limit 1");
         conn.commit();
         conn.close();
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f1401007/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index 6531d95..8942716 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -152,7 +152,7 @@ public class MutableIndexIT extends ParallelStatsDisabledIT {
             assertFalse(rs.next());
             
             stmt = conn.prepareStatement("UPSERT INTO " + fullTableName
-                    + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, null FROM "
+                    + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, CAST(null AS BIGINT) FROM "
                     + fullTableName + " WHERE long_col2=?");
             stmt.setLong(1,3L);
             assertEquals(1,stmt.executeUpdate());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f1401007/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixResultSetMetadataTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixResultSetMetadataTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixResultSetMetadataTest.java
index 9153595..17b3794 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixResultSetMetadataTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixResultSetMetadataTest.java
@@ -35,11 +35,11 @@ public class PhoenixResultSetMetadataTest extends BaseConnectionlessQueryTest {
         conn.createStatement().execute(
                 "CREATE TABLE T (pk1 CHAR(15) not null, pk2 VARCHAR not null,  v1 VARCHAR(15), v2 DATE, v3 VARCHAR " +
                 "CONSTRAINT pk PRIMARY KEY (pk1, pk2)) ");
-        ResultSet rs = conn.createStatement().executeQuery("SELECT pk1, pk2, v1, v2, NULL FROM T");
+        ResultSet rs = conn.createStatement().executeQuery("SELECT pk1, pk2, v1, v2, CAST(null AS varchar) FROM T");
         assertEquals(15, rs.getMetaData().getColumnDisplaySize(1));
         assertEquals(PhoenixResultSetMetaData.DEFAULT_DISPLAY_WIDTH, rs.getMetaData().getColumnDisplaySize(2));
         assertEquals(15, rs.getMetaData().getColumnDisplaySize(3));
         assertEquals(conn.unwrap(PhoenixConnection.class).getDatePattern().length(), rs.getMetaData().getColumnDisplaySize(4));
-        assertEquals(QueryConstants.NULL_DISPLAY_TEXT.length(), rs.getMetaData().getColumnDisplaySize(5));
+        assertEquals(40, rs.getMetaData().getColumnDisplaySize(5));
     }
 }


[2/4] phoenix git commit: PHOENIX-6 Support ON DUPLICATE KEY construct

Posted by ja...@apache.org.
PHOENIX-6 Support ON DUPLICATE KEY construct


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/837d114b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/837d114b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/837d114b

Branch: refs/heads/4.x-HBase-1.1
Commit: 837d114b20904f7ccd2a72208346f9ad48e1b665
Parents: 0bc6f6d
Author: James Taylor <ja...@apache.org>
Authored: Thu Oct 27 11:20:20 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu Oct 27 13:59:12 2016 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/OnDuplicateKeyIT.java       | 523 +++++++++++++++++++
 .../phoenix/end2end/index/IndexTestUtil.java    |   6 +-
 .../org/apache/phoenix/tx/TransactionIT.java    |  15 +
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |  24 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |   6 +-
 .../apache/phoenix/compile/UpsertCompiler.java  | 104 +++-
 .../UngroupedAggregateRegionObserver.java       |   2 +-
 .../phoenix/exception/SQLExceptionCode.java     |   6 +
 .../apache/phoenix/execute/MutationState.java   |  32 +-
 .../org/apache/phoenix/hbase/index/Indexer.java |  92 +++-
 .../hbase/index/builder/BaseIndexBuilder.java   |  14 +-
 .../hbase/index/builder/IndexBuildManager.java  |  10 +
 .../hbase/index/builder/IndexBuilder.java       |  29 +-
 .../phoenix/hbase/index/covered/IndexCodec.java |   1 -
 .../hbase/index/util/KeyValueBuilder.java       |  15 +-
 .../phoenix/index/PhoenixIndexBuilder.java      | 319 +++++++++++
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  11 +-
 .../apache/phoenix/parse/ParseNodeFactory.java  |   7 +-
 .../apache/phoenix/parse/UpsertStatement.java   |  10 +-
 .../apache/phoenix/schema/DelegateColumn.java   |  10 +
 .../apache/phoenix/schema/DelegateTable.java    |  18 +-
 .../org/apache/phoenix/schema/PColumnImpl.java  |  12 +-
 .../java/org/apache/phoenix/schema/PRow.java    |  11 +-
 .../java/org/apache/phoenix/schema/PTable.java  |   6 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |  48 +-
 .../org/apache/phoenix/util/ExpressionUtil.java |   1 -
 .../phoenix/compile/QueryCompilerTest.java      | 104 +++-
 27 files changed, 1319 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
new file mode 100644
index 0000000..9a81026
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
+    private final String indexDDL;
+    
+    public OnDuplicateKeyIT(String indexDDL) {
+        this.indexDDL = indexDDL;
+    }
+    
+    @Parameters
+    public static Collection<Object> data() {
+        List<Object> testCases = Lists.newArrayList();
+        testCases.add(new String[] {
+                "",
+        });
+        testCases.add(new String[] {
+                "create index %s_IDX on %s(counter1) include (counter2)",
+        });
+        testCases.add(new String[] {
+                "create index %s_IDX on %s(counter1, counter2)",
+        });
+        testCases.add(new String[] {
+                "create local index %s_IDX on %s(counter1) include (counter2)",
+        });
+        testCases.add(new String[] {
+                "create local index %s_IDX on %s(counter1, counter2)",
+        });
+        return testCases;
+    }
+    
+    private void createIndex(Connection conn, String tableName) throws SQLException {
+        if (indexDDL == null || indexDDL.length() == 0) {
+            return;
+        }
+        String ddl = String.format(indexDDL, tableName, tableName);
+        conn.createStatement().execute(ddl);
+    }
+    
+    @Test
+    public void testNewAndUpdateOnSingleNumericColumn() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 smallint)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        String dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(0,rs.getLong(2));
+        assertFalse(rs.next());
+        
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(1,rs.getLong(2));
+        assertFalse(rs.next());
+        
+        conn.close();
+    }
+    
+    @Test
+    public void testNewAndUpdateOnSingleNumericColumnWithOtherColumns() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(k1 varchar, k2 varchar, counter1 varchar, counter2 date, other1 char(3), other2 varchar default 'f', constraint pk primary key (k1,k2))";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        String dml = "UPSERT INTO " + tableName + " VALUES('a','b','c',null,'eee') " + 
+                     "ON DUPLICATE KEY UPDATE counter1 = counter1 || CASE WHEN LENGTH(counter1) < 10 THEN 'SMALL' ELSE 'LARGE' END || k2 || other2 || other1 ";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals("b",rs.getString(2));
+        assertEquals("c",rs.getString(3));
+        assertEquals(null,rs.getDate(4));
+        assertEquals("eee",rs.getString(5));
+        assertEquals("f",rs.getString(6));
+        assertFalse(rs.next());
+        
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals("b",rs.getString(2));
+        assertEquals("cSMALLbfeee",rs.getString(3));
+        assertEquals(null,rs.getDate(4));
+        assertEquals("eee",rs.getString(5));
+        assertEquals("f",rs.getString(6));
+        assertFalse(rs.next());
+        
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals("b",rs.getString(2));
+        assertEquals("cSMALLbfeeeLARGEbfeee",rs.getString(3));
+        assertEquals(null,rs.getDate(4));
+        assertEquals("eee",rs.getString(5));
+        assertEquals("f",rs.getString(6));
+        assertFalse(rs.next());
+        
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a','b','c',null,'eee') " + 
+                "ON DUPLICATE KEY UPDATE counter1 = to_char(rand()), counter2 = current_date() + 1");
+        conn.commit();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals("b",rs.getString(2));
+        double d = Double.parseDouble(rs.getString(3));
+        assertTrue(d >= 0.0 && d <= 1.0);
+        Date date = rs.getDate(4);
+        assertTrue(date.after(new Date(System.currentTimeMillis())));
+        assertEquals("eee",rs.getString(5));
+        assertEquals("f",rs.getString(6));
+        assertFalse(rs.next());
+        
+        conn.close();
+    }
+    
+    @Test
+    public void testNewAndUpdateOnSingleVarcharColumn() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, counter1 varchar, counter2 smallint)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        String dml = "UPSERT INTO " + tableName + " VALUES('a','b') ON DUPLICATE KEY UPDATE counter1 = counter1 || 'b'";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE substr(counter1,1,1) = 'b'");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals("b",rs.getString(2));
+        assertFalse(rs.next());
+        
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE substr(counter1,1,1) = 'b'");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals("bb",rs.getString(2));
+        assertFalse(rs.next());
+        
+        conn.close();
+    }
+    
+    @Test
+    public void testDeleteOnSingleVarcharColumnAutoCommit() throws Exception {
+        testDeleteOnSingleVarcharColumn(true);
+    }
+    
+    @Test
+    public void testDeleteOnSingleVarcharColumnNoAutoCommit() throws Exception {
+        testDeleteOnSingleVarcharColumn(false);
+    }
+    
+    private void testDeleteOnSingleVarcharColumn(boolean autoCommit) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(autoCommit);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, counter1 varchar, counter2 smallint)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        String dml = "UPSERT INTO " + tableName + " VALUES('a','b') ON DUPLICATE KEY UPDATE counter1 = null";
+        conn.createStatement().execute(dml);
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(null,rs.getString(2));
+        assertFalse(rs.next());
+        
+        dml = "UPSERT INTO " + tableName + " VALUES('a','b',0)";
+        conn.createStatement().execute(dml);
+        dml = "UPSERT INTO " + tableName + " VALUES('a','b', 0) ON DUPLICATE KEY UPDATE counter1 = null, counter2 = counter2 + 1";
+        conn.createStatement().execute(dml);
+        dml = "UPSERT INTO " + tableName + " VALUES('a','b', 0) ON DUPLICATE KEY UPDATE counter1 = 'c', counter2 = counter2 + 1";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals("c",rs.getString(2));
+        assertEquals(2,rs.getInt(3));
+        assertFalse(rs.next());
+
+        conn.close();
+    }
+    
+    @Test
+    public void testIgnoreOnSingleColumn() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(10,rs.getLong(2));
+        assertFalse(rs.next());
+        
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY IGNORE");
+        conn.commit();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(10,rs.getLong(2));
+        assertFalse(rs.next()); 
+        
+        conn.close();
+    }
+    
+    @Test
+    public void testInitialIgnoreWithUpdateOnSingleColumn() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        // Test ignore combined with update in same commit batch for new record
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10) ON DUPLICATE KEY IGNORE");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(11,rs.getLong(2));
+        assertFalse(rs.next());
+        
+        conn.close();
+    }
+    
+    @Test
+    public void testOverrideOnDupKeyUpdateWithUpsert() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        // Test upsert overriding ON DUPLICATE KEY entries
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',1) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',2) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(10,rs.getLong(2));
+        assertFalse(rs.next());
+        
+        conn.close();
+    }
+    
+    @Test
+    public void testNewAndMultiUpdateOnSingleColumnAutoCommit() throws Exception {
+        testNewAndMultiUpdateOnSingleColumn(true);
+    }
+    
+    @Test
+    public void testNewAndMultiUpdateOnSingleColumnNoAutoCommit() throws Exception {
+        testNewAndMultiUpdateOnSingleColumn(false);
+    }
+    
+    private void testNewAndMultiUpdateOnSingleColumn(boolean autoCommit) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(autoCommit);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 integer)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',5) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); // VALUES ignored
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY IGNORE"); // no impact
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); // VALUES ignored
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(2,rs.getLong(2));
+        assertFalse(rs.next());
+        
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2");
+        conn.commit();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(9,rs.getLong(2));
+        assertFalse(rs.next());
+        
+        conn.close();
+    }
+    
+    @Test
+    public void testNewAndMultiDifferentUpdateOnSingleColumn() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 decimal)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        String dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
+        conn.createStatement().execute(dml);
+        dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2";
+        conn.createStatement().execute(dml);
+        dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(3,rs.getLong(2));
+        assertFalse(rs.next());
+        
+        conn.close();
+    }
+    
+    @Test
+    public void testNewAndMultiDifferentUpdateOnMultipleColumnsAutoCommit() throws Exception {
+        testNewAndMultiDifferentUpdateOnMultipleColumns(true);
+    }
+    
+    @Test
+    public void testNewAndMultiDifferentUpdateOnMultipleColumnsNoAutoCommit() throws Exception {
+        testNewAndMultiDifferentUpdateOnMultipleColumns(false);
+    }
+    
+    private void testNewAndMultiDifferentUpdateOnMultipleColumns(boolean autoCommit) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(autoCommit);
+        String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 tinyint)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        String dml = "UPSERT INTO " + tableName + " VALUES('a',0,0) ON DUPLICATE KEY UPDATE counter1 = counter2 + 1, counter2 = counter1 + 2";
+        conn.createStatement().execute(dml);
+        conn.createStatement().execute(dml);
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(1,rs.getLong(2));
+        assertEquals(2,rs.getLong(3));
+        assertFalse(rs.next());
+        
+        rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(1,rs.getLong(2));
+        assertEquals(2,rs.getLong(3));
+        assertFalse(rs.next());
+
+        conn.createStatement().execute(dml);
+        conn.commit();
+        
+        rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(3,rs.getLong(2));
+        assertEquals(3,rs.getLong(3));
+        assertFalse(rs.next());
+        
+        rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(3,rs.getLong(2));
+        assertEquals(3,rs.getLong(3));
+        assertFalse(rs.next());
+
+        conn.close();
+    }
+    
+    @Test
+    public void testAtomicUpdate() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        final String tableName = generateUniqueName();
+        String ddl = " create table " + tableName + "(pk varchar primary key, counter1 integer, counter2 integer)";
+        conn.createStatement().execute(ddl);
+        createIndex(conn, tableName);
+        int nThreads = 10;
+        final int[] resultHolder = new int[1];
+        final int nCommits = 100;
+        final int nIncrementsPerCommit = 2;
+        ExecutorService exec = Executors.newFixedThreadPool(nThreads);
+        List<Future> futures = Lists.newArrayListWithExpectedSize(nThreads);
+        Connection[] connections = new Connection[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            connections[i] = DriverManager.getConnection(getUrl(), props);
+        }
+        for (int i = 0; i < nThreads; i++) {
+            final Connection myConn = connections[i];
+            futures.add(exec.submit(new Runnable() {
+                @Override
+                public void run() {
+                    String dml = "UPSERT INTO " + tableName + " VALUES('a',1) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
+                    try {
+                        for (int j = 0; j < nCommits; j++) {
+                            for (int k = 0; k < nIncrementsPerCommit; k++) {
+                                myConn.createStatement().execute(dml);
+                                resultHolder[0]++;
+                            }
+                            myConn.commit();
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }));
+        }
+        Collections.shuffle(futures);
+        for (Future future : futures) {
+            future.get();
+        }
+        exec.shutdownNow();
+
+        int finalResult = nThreads * nCommits * nIncrementsPerCommit;
+        //assertEquals(finalResult,resultHolder[0]);
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(finalResult,rs.getInt(2));
+        assertFalse(rs.next());
+
+        rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE counter1 >= 0");
+        assertTrue(rs.next());
+        assertEquals("a",rs.getString(1));
+        assertEquals(finalResult,rs.getInt(2));
+        assertFalse(rs.next());
+        
+        conn.close();
+    }
+    
+}
+    

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
index ba04ad7..e854f23 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
@@ -43,11 +43,11 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -125,7 +125,7 @@ public class IndexTestUtil {
         long ts = MetaDataUtil.getClientTimeStamp(dataMutation);
         if (dataMutation instanceof Delete && dataMutation.getFamilyCellMap().values().isEmpty()) {
             indexTable.newKey(ptr, indexValues);
-            row = indexTable.newRow(builder, ts, ptr);
+            row = indexTable.newRow(builder, ts, ptr, false);
             row.delete();
         } else {
             // If no column families in table, then nothing to look for 
@@ -153,7 +153,7 @@ public class IndexTestUtil {
                 }
             }
             indexTable.newKey(ptr, indexValues);
-            row = indexTable.newRow(builder, ts, ptr);
+            row = indexTable.newRow(builder, ts, ptr, false);
             int pos = 0;
             while ((pos = indexValuesSet.nextSetBit(pos)) >= 0) {
                 int index = nIndexColumns + indexOffset + pos++;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 2e45d5a..83128f1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -698,4 +698,19 @@ public class TransactionIT extends ParallelStatsDisabledIT {
 
         }
     }
+    
+    
+    @Test
+    public void testOnDupKeyForTransactionalTable() throws Exception {
+        // TODO: we should support having a transactional table defined for a connectionless connection
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String transactTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + transactTableName + " (k integer not null primary key, v bigint) TRANSACTIONAL=true");
+            conn.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL.getErrorCode(), e.getErrorCode());
+        }
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 2821a55..1d4ebb8 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -136,6 +136,8 @@ tokens
     EXECUTE = 'execute';
     UPGRADE = 'upgrade';
     DEFAULT = 'default';
+    DUPLICATE = 'duplicate';
+    IGNORE = 'ignore';
 }
 
 
@@ -715,10 +717,26 @@ finally{ contextStack.pop(); }
 upsert_node returns [UpsertStatement ret]
     :   UPSERT (hint=hintClause)? INTO t=from_table_name
         (LPAREN p=upsert_column_refs RPAREN)?
-        ((VALUES LPAREN v=one_or_more_expressions RPAREN) | s=select_node)
-        {ret = factory.upsert(factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), v, s, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); }
-    ;
+        ((VALUES LPAREN v=one_or_more_expressions RPAREN ( ON DUPLICATE KEY ( ig=IGNORE | ( UPDATE pairs=update_column_pairs ) ) )? ) | s=select_node)
+        {ret = factory.upsert(
+            factory.namedTable(null,t,p == null ? null : p.getFirst()), 
+            hint, p == null ? null : p.getSecond(), 
+            v, s, getBindCount(), 
+            new HashMap<String, UDFParseNode>(udfParseNodes),
+            ig != null ? Collections.<Pair<ColumnName,ParseNode>>emptyList() : pairs != null ? pairs : null); }
+    ;
+  
+update_column_pairs returns [ List<Pair<ColumnName,ParseNode>> ret]
+@init{ret = new ArrayList<Pair<ColumnName,ParseNode>>(); }
+    :  p=update_column_pair { ret.add(p); }
+       (COMMA p=update_column_pair { ret.add(p); } )*
+;
+
+update_column_pair returns [ Pair<ColumnName,ParseNode> ret ]
+    :  c=column_name EQ e=expression { $ret = new Pair<ColumnName,ParseNode>(c,e); }
+;
 
+  
 upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret]
 @init{ret = new Pair<List<ColumnDef>,List<ColumnName>>(new ArrayList<ColumnDef>(), new ArrayList<ColumnName>()); }
     :  d=dyn_column_name_or_def { if (d.getDataType()!=null) { $ret.getFirst().add(d); } $ret.getSecond().add(d.getColumnDefName()); } 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index e0881cf..602cd6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -159,11 +159,11 @@ public class DeleteCompiler {
                 }
                 // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the
                 // row key will already have its value. 
-                mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
+                mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                 for (int i = 0; i < indexTableRefs.size(); i++) {
                     ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
                     rs.getCurrentRow().getKey(indexPtr);
-                    indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
+                    indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                 }
                 if (mutations.size() > maxSize) {
                     throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -499,7 +499,7 @@ public class DeleteCompiler {
                         Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); 
                         Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
                         while (iterator.hasNext()) {
-                            mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
+                            mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                         }
                         return new MutationState(tableRef, mutation, 0, maxSize, connection);
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 1caf7be..85517a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -27,6 +27,7 @@ import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -52,6 +54,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -75,6 +78,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.DelegateColumn;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
@@ -96,6 +100,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.schema.types.PUnsignedLong;
+import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -107,10 +112,11 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class UpsertCompiler {
+
     private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
             PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
             PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
-            byte[][] viewConstants) throws SQLException {
+            byte[][] viewConstants, byte[] onDupKeyBytes) throws SQLException {
         Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -154,7 +160,7 @@ public class UpsertCompiler {
                 ptr.set(ScanRanges.prefixKey(ptr.get(), 0, regionPrefix, regionPrefix.length));
             }
         } 
-        mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo));
+        mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
     }
     
     private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
@@ -208,7 +214,7 @@ public class UpsertCompiler {
                             table.rowKeyOrderOptimizable());
                     values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants);
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null);
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
                 if (isAutoCommit && rowCount % batchSize == 0) {
@@ -869,6 +875,85 @@ public class UpsertCompiler {
             constantExpressions.add(expression);
             nodeIndex++;
         }
+        byte[] onDupKeyBytesToBe = null;
+        List<Pair<ColumnName,ParseNode>> onDupKeyPairs = upsert.getOnDupKeyPairs();
+        if (onDupKeyPairs != null) {
+            if (table.isImmutableRows()) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE)
+                .setSchemaName(table.getSchemaName().getString())
+                .setTableName(table.getTableName().getString())
+                .build().buildException();
+            }
+            if (table.isTransactional()) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL)
+                .setSchemaName(table.getSchemaName().getString())
+                .setTableName(table.getTableName().getString())
+                .build().buildException();
+            }
+            if (connection.getSCN() != null) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY)
+                .setSchemaName(table.getSchemaName().getString())
+                .setTableName(table.getTableName().getString())
+                .build().buildException();
+            }
+            if (onDupKeyPairs.isEmpty()) { // ON DUPLICATE KEY IGNORE
+                onDupKeyBytesToBe = PhoenixIndexBuilder.serializeOnDupKeyIgnore();
+            } else {                       // ON DUPLICATE KEY UPDATE
+                int position = 1;
+                UpdateColumnCompiler compiler = new UpdateColumnCompiler(context);
+                int nColumns = onDupKeyPairs.size();
+                List<Expression> updateExpressions = Lists.newArrayListWithExpectedSize(nColumns);
+                LinkedHashSet<PColumn>updateColumns = Sets.newLinkedHashSetWithExpectedSize(nColumns + 1);
+                updateColumns.add(new PColumnImpl(
+                        table.getPKColumns().get(0).getName(), // Use first PK column name as we know it won't conflict with others
+                        null, PVarbinary.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false));
+                for (Pair<ColumnName,ParseNode> columnPair : onDupKeyPairs) {
+                    ColumnName colName = columnPair.getFirst();
+                    PColumn updateColumn = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()).getColumn();
+                    if (SchemaUtil.isPKColumn(updateColumn)) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY)
+                        .setSchemaName(table.getSchemaName().getString())
+                        .setTableName(table.getTableName().getString())
+                        .setColumnName(updateColumn.getName().getString())
+                        .build().buildException();
+                    }
+                    final int columnPosition = position++;
+                    if (!updateColumns.add(new DelegateColumn(updateColumn) {
+                        @Override
+                        public int getPosition() {
+                            return columnPosition;
+                        }
+                    })) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY)
+                            .setSchemaName(table.getSchemaName().getString())
+                            .setTableName(table.getTableName().getString())
+                            .setColumnName(updateColumn.getName().getString())
+                            .build().buildException();
+                    };
+                    ParseNode updateNode = columnPair.getSecond();
+                    compiler.setColumn(updateColumn);
+                    Expression updateExpression = updateNode.accept(compiler);
+                    // Check that updateExpression is coercible to updateColumn
+                    if (updateExpression.getDataType() != null && !updateExpression.getDataType().isCastableTo(updateColumn.getDataType())) {
+                        throw TypeMismatchException.newException(
+                                updateExpression.getDataType(), updateColumn.getDataType(), "expression: "
+                                        + updateExpression.toString() + " for column " + updateColumn);
+                    }
+                    if (compiler.isAggregate()) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY)
+                            .setSchemaName(table.getSchemaName().getString())
+                            .setTableName(table.getTableName().getString())
+                            .setColumnName(updateColumn.getName().getString())
+                            .build().buildException();
+                    }
+                    updateExpressions.add(updateExpression);
+                }
+                PTable onDupKeyTable = PTableImpl.makePTable(table, updateColumns);
+                onDupKeyBytesToBe = PhoenixIndexBuilder.serializeOnDupKeyUpdate(onDupKeyTable, updateExpressions);
+            }
+        }
+        final byte[] onDupKeyBytes = onDupKeyBytesToBe;
+        
         return new MutationPlan() {
             @Override
             public ParameterMetaData getParameterMetaData() {
@@ -958,7 +1043,7 @@ public class UpsertCompiler {
                     indexMaintainer = table.getIndexMaintainer(parentTable, connection);
                     viewConstants = IndexUtil.getViewConstants(parentTable);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants);
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes);
                 return new MutationState(tableRef, mutation, 0, maxSize, connection);
             }
 
@@ -1004,10 +1089,10 @@ public class UpsertCompiler {
         return upsertRef;
     }
 
-    private static final class UpsertValuesCompiler extends ExpressionCompiler {
+    private static class UpdateColumnCompiler extends ExpressionCompiler {
         private PColumn column;
         
-        private UpsertValuesCompiler(StatementContext context) {
+        private UpdateColumnCompiler(StatementContext context) {
             super(context);
         }
 
@@ -1032,7 +1117,12 @@ public class UpsertCompiler {
             }
             return super.visit(node);
         }
-        
+    }
+    
+    private static class UpsertValuesCompiler extends UpdateColumnCompiler {
+        private UpsertValuesCompiler(StatementContext context) {
+            super(context);
+        }
         
         @Override
         public Expression visit(SequenceValueParseNode node) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 1912ff5..10d21d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -536,7 +536,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     }
                                 }
                                 projectedTable.newKey(ptr, values);
-                                PRow row = projectedTable.newRow(kvBuilder, ts, ptr);
+                                PRow row = projectedTable.newRow(kvBuilder, ts, ptr, false);
                                 for (; i < projectedColumns.size(); i++) {
                                     Expression expression = selectExpressions.get(i);
                                     if (expression.evaluate(result, ptr)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 2346224..ac5619f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -326,6 +326,12 @@ public enum SQLExceptionCode {
             return new SequenceNotFoundException(info.getSchemaName(), info.getTableName());
         }
     }),
+    CANNOT_UPDATE_PK_ON_DUP_KEY(1218, "42Z18", "Primary key columns may not be udpated in ON DUPLICATE KEY UPDATE clause." ),
+    CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE(1219, "42Z19", "The ON DUPLICATE KEY UPDATE clause may not be used for immutable tables." ),
+    CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL(1220, "42Z20", "The ON DUPLICATE KEY UPDATE clause may not be used for transactional tables." ),
+    DUPLICATE_COLUMN_IN_ON_DUP_KEY(1221, "42Z21", "Duplicate column in ON DUPLICATE KEY UPDATE." ),
+    AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY(1222, "42Z22", "Aggregation in ON DUPLICATE KEY UPDATE is not allowed." ),
+    CANNOT_SET_SCN_IN_ON_DUP_KEY(1223, "42Z23", "The CURRENT_SCN may not be set for statement using ON DUPLICATE KEY." ),
 
     /** Parser error. (errorcode 06, sqlState 42P) */
     PARSER_ERROR(601, "42P00", "Syntax error.", Factory.SYNTAX_ERROR),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index e7e6aa7..d04a79b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -57,6 +57,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
@@ -620,6 +621,8 @@ public class MutationState implements SQLCloseable {
         long timestampToUse = timestamp;
         while (iterator.hasNext()) {
             Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next();
+            byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes();
+            boolean hasOnDupKey = onDupKeyBytes != null;
             ImmutableBytesPtr key = rowEntry.getKey();
             RowMutationState state = rowEntry.getValue();
             if (tableWithRowTimestampCol) {
@@ -635,7 +638,7 @@ public class MutationState implements SQLCloseable {
             }
             PRow row =
                     tableRef.getTable()
-                            .newRow(connection.getKeyValueBuilder(), timestampToUse, key);
+                            .newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);
             List<Mutation> rowMutations, rowMutationsPertainingToIndex;
             if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete
                 row.delete();
@@ -650,6 +653,15 @@ public class MutationState implements SQLCloseable {
                     row.setValue(valueEntry.getKey(), valueEntry.getValue());
                 }
                 rowMutations = row.toRowMutations();
+                // Pass through ON DUPLICATE KEY info through mutations
+                // In the case of the same clause being used on many statements, this will be
+                // inefficient because we're transmitting the same information for each mutation.
+                // TODO: use our ServerCache 
+                for (Mutation mutation : rowMutations) {
+                    if (onDupKeyBytes != null) {
+                        mutation.setAttribute(PhoenixIndexBuilder.ATOMIC_OP_ATTRIB, onDupKeyBytes);
+                    }
+                }
                 rowMutationsPertainingToIndex = rowMutations;
             }
             mutationList.addAll(rowMutations);
@@ -1452,15 +1464,22 @@ public class MutationState implements SQLCloseable {
         @Nonnull private Map<PColumn,byte[]> columnValues;
         private int[] statementIndexes;
         @Nonnull private final RowTimestampColInfo rowTsColInfo;
+        private byte[] onDupKeyBytes;
         
-        public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo) {
+        public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo,
+                byte[] onDupKeyBytes) {
             checkNotNull(columnValues);
             checkNotNull(rowTsColInfo);
             this.columnValues = columnValues;
             this.statementIndexes = new int[] {statementIndex};
             this.rowTsColInfo = rowTsColInfo;
+            this.onDupKeyBytes = onDupKeyBytes;
         }
 
+        byte[] getOnDupKeyBytes() {
+            return onDupKeyBytes;
+        }
+        
         Map<PColumn, byte[]> getColumnValues() {
             return columnValues;
         }
@@ -1470,7 +1489,14 @@ public class MutationState implements SQLCloseable {
         }
 
         void join(RowMutationState newRow) {
-            getColumnValues().putAll(newRow.getColumnValues());
+            // If we already have a row and the new row has an ON DUPLICATE KEY clause
+            // ignore the new values (as that's what the server will do).
+            if (newRow.onDupKeyBytes == null) {
+                getColumnValues().putAll(newRow.getColumnValues());
+            }
+            // Concatenate ON DUPLICATE KEY bytes to allow multiple
+            // increments of the same row in the same commit batch.
+            this.onDupKeyBytes = PhoenixIndexBuilder.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes);
             statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index a964373..3b05a7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,18 +34,23 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@@ -69,6 +75,7 @@ import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
 import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.ServerUtil;
 
 import com.google.common.collect.Multimap;
 
@@ -189,6 +196,45 @@ public class Indexer extends BaseRegionObserver {
     this.recoveryWriter.stop(msg);
   }
 
+  /**
+   * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
+   * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
+   * real increment, though, it's really more of a Put. We translate the Increment into a
+   * list of mutations, at most a single Put and Delete that are the changes upon executing
+   * the list of ON DUPLICATE KEY clauses for this row.
+   */
+  @Override
+  public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
+          final Increment inc) throws IOException {
+      try {
+          List<Mutation> mutations = this.builder.executeAtomicOp(inc);
+          if (mutations == null) {
+              return null;
+          }
+
+          // Causes the Increment to be ignored as we're committing the mutations
+          // ourselves below.
+          e.bypass();
+          e.complete();
+          // ON DUPLICATE KEY IGNORE will return empty list if row already exists
+          // as no action is required in that case.
+          if (!mutations.isEmpty()) {
+              Region region = e.getEnvironment().getRegion();
+              // Otherwise, submit the mutations directly here
+              region.mutateRowsWithLocks(
+                      mutations,
+                      Collections.<byte[]>emptyList(), // Rows are already locked
+                      HConstants.NO_NONCE, HConstants.NO_NONCE);
+          }
+          return Result.EMPTY_RESULT;
+      } catch (Throwable t) {
+          throw ServerUtil.createIOException(
+                  "Unable to process ON DUPLICATE IGNORE for " + 
+                  e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() + 
+                  "(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
+      }
+  }
+
   @Override
   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
@@ -206,13 +252,15 @@ public class Indexer extends BaseRegionObserver {
         "Somehow didn't return an index update but also didn't propagate the failure to the client!");
   }
 
+  private static final OperationStatus SUCCESS = new OperationStatus(OperationStatusCode.SUCCESS);
+  
   public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
           MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
 
       // first group all the updates for a single row into a single update to be processed
       Map<ImmutableBytesPtr, MultiMutation> mutations =
               new HashMap<ImmutableBytesPtr, MultiMutation>();
-
+          
       Durability defaultDurability = Durability.SYNC_WAL;
       if(c.getEnvironment().getRegion() != null) {
           defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability();
@@ -222,33 +270,35 @@ public class Indexer extends BaseRegionObserver {
       Durability durability = Durability.SKIP_WAL;
       for (int i = 0; i < miniBatchOp.size(); i++) {
           Mutation m = miniBatchOp.getOperation(i);
+          if (this.builder.isAtomicOp(m)) {
+              miniBatchOp.setOperationStatus(i, SUCCESS);
+              continue;
+          }
           // skip this mutation if we aren't enabling indexing
           // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
           // should be indexed, which means we need to expose another method on the builder. Such is the
           // way optimization go though.
-          if (!this.builder.isEnabled(m)) {
-              continue;
-          }
-
-          Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ? 
-                  defaultDurability : m.getDurability();
-          if (effectiveDurablity.ordinal() > durability.ordinal()) {
-              durability = effectiveDurablity;
-          }
-
-          // add the mutation to the batch set
-          ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-          MultiMutation stored = mutations.get(row);
-          // we haven't seen this row before, so add it
-          if (stored == null) {
-              stored = new MultiMutation(row);
-              mutations.put(row, stored);
+          if (this.builder.isEnabled(m)) {
+              Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ? 
+                      defaultDurability : m.getDurability();
+              if (effectiveDurablity.ordinal() > durability.ordinal()) {
+                  durability = effectiveDurablity;
+              }
+    
+              // add the mutation to the batch set
+              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+              MultiMutation stored = mutations.get(row);
+              // we haven't seen this row before, so add it
+              if (stored == null) {
+                  stored = new MultiMutation(row);
+                  mutations.put(row, stored);
+              }
+              stored.addAll(m);
           }
-          stored.addAll(m);
       }
 
       // early exit if it turns out we don't have any edits
-      if (mutations.entrySet().size() == 0) {
+      if (mutations.isEmpty()) {
           return;
       }
 
@@ -360,7 +410,7 @@ public class Indexer extends BaseRegionObserver {
   private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates)
           throws Exception {
       //short circuit, if we don't need to do any work
-      if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) {
+      if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null) {
           // already did the index update in prePut, so we are done
           return;
       }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index 4e329e9..b9174b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -12,17 +12,19 @@ package org.apache.phoenix.hbase.index.builder;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 
 /**
@@ -91,6 +93,16 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
         return this.codec.isEnabled(m);
     }
 
+    @Override
+    public boolean isAtomicOp(Mutation m) throws IOException {
+        return false;
+    }
+
+    @Override
+    public List<Mutation> executeAtomicOp(Increment inc) throws IOException {
+        return null;
+    }
+    
     /**
      * Exposed for testing!
      * 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index f411b8e..325904d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -178,6 +179,14 @@ public class IndexBuildManager implements Stoppable {
     return delegate.isEnabled(m);
   }
 
+  public boolean isAtomicOp(Mutation m) throws IOException {
+    return delegate.isAtomicOp(m);
+  }
+
+  public List<Mutation> executeAtomicOp(Increment inc) throws IOException {
+      return delegate.executeAtomicOp(inc);
+  }
+  
   @Override
   public void stop(String why) {
     if (stopped) {
@@ -196,4 +205,5 @@ public class IndexBuildManager implements Stoppable {
   public IndexBuilder getBuilderForTesting() {
     return this.delegate;
   }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index 36aba77..dff205a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -19,11 +19,13 @@ package org.apache.phoenix.hbase.index.builder;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -65,19 +67,10 @@ public interface IndexBuilder extends Stoppable {
    * Implementers must ensure that this method is thread-safe - it could (and probably will) be
    * called concurrently for different mutations, which may or may not be part of the same batch.
    * @param mutation update to the primary table to be indexed.
- * @param context TODO
+   * @param context index meta data for the mutation
    * @return a Map of the mutations to make -> target index table name
    * @throws IOException on failure
    */
-  /* TODO:
-  Create BaseIndexBuilder with everything except getIndexUpdate(). 
-  Derive two concrete classes: NonTxIndexBuilder and TxIndexBuilder.
-  NonTxIndexBuilder will be current impl of this method.
-  TxIndexBuilder will use a scan with skipScan over TxAwareHBase to find the latest values.
-  Conditionally don't do WALEdit stuff for txnal table (ensure Phoenix/HBase tolerates index WAl edit info not being there)
-  Noop Failure mode
-  */
-  
   public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;
 
     /**
@@ -139,4 +132,20 @@ public interface IndexBuilder extends Stoppable {
  * @throws IOException 
    */
   public boolean isEnabled(Mutation m) throws IOException;
+  
+  /**
+   * True if mutation has an ON DUPLICATE KEY clause
+   * @param m mutation
+   * @return true if mutation has ON DUPLICATE KEY expression and false otherwise.
+   * @throws IOException
+   */
+  public boolean isAtomicOp(Mutation m) throws IOException;
+
+  /**
+   * Calculate the mutations based on the ON DUPLICATE KEY clause
+   * @param inc increment to run against
+   * @return list of mutations as a result of executing the ON DUPLICATE KEY clause
+   * or null if Increment does not represent an ON DUPLICATE KEY clause.
+   */
+  public List<Mutation> executeAtomicOp(Increment inc) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index 93de11e..e6d683e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -23,7 +23,6 @@ import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
  * added to the codec, as well as potentially not haivng to implement some methods.
  */
 public interface IndexCodec {
-
     /**
      * Do any code initialization necessary
      * 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/837d114b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
index e3bd7a8..741bf87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
@@ -18,9 +18,11 @@
 package org.apache.phoenix.hbase.index.util;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
@@ -40,13 +42,14 @@ public abstract class KeyValueBuilder {
      * @throws RuntimeException if there is an IOException thrown from the underlying {@link Put}
      */
     @SuppressWarnings("javadoc")
-    public static void addQuietly(Put put, KeyValueBuilder builder, KeyValue kv) {
-        try {
-            put.add(kv);
-        } catch (IOException e) {
-            throw new RuntimeException("KeyValue Builder " + builder + " created an invalid kv: "
-                    + kv + "!");
+    public static void addQuietly(Mutation m, KeyValueBuilder builder, KeyValue kv) {
+        byte [] family = CellUtil.cloneFamily(kv);
+        List<Cell> list = m.getFamilyCellMap().get(family);
+        if (list == null) {
+            list = new ArrayList<Cell>();
+            m.getFamilyCellMap().put(family, list);
         }
+        list.add(kv);
     }
 
     /**