You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2021/06/16 22:57:51 UTC

[phoenix] branch 4.x updated: PHOENIX-6387: Conditional updates on tables with indexes

This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 68ef271   PHOENIX-6387: Conditional updates on tables with indexes
68ef271 is described below

commit 68ef27155d0e3453efe4325c8d08199b1ebaef5c
Author: tkhurana <kh...@gmail.com>
AuthorDate: Wed Jun 16 15:57:41 2021 -0700

     PHOENIX-6387: Conditional updates on tables with indexes
    
    * PHOENIX-6387 Conditional updates on tables with indexes (#1215)
    
    * PHOENIX-6387 Conditional updates on tables with indexes client side
    
    * PHOENIX-6387 Conditional updates on tables with indexes server side
    
    * Remove the extra read For regular upserts on tables with local index
    
    * Addressed review comments
    
    * PHOENIX-6474 Client and server metrics for atomic upserts (#1237)
    
    * PHOENIX-6474 Client and server metrics for atomic upserts
    
    * Fixed failing tests related to metrics
---
 .../apache/phoenix/end2end/OnDuplicateKeyIT.java   | 111 +++-
 .../apache/phoenix/end2end/WALAnnotationIT.java    |  34 ++
 .../end2end/index/GlobalIndexCheckerIT.java        |  28 +
 .../phoenix/monitoring/BasePhoenixMetricsIT.java   |   2 +-
 .../phoenix/monitoring/PhoenixMetricsIT.java       |   2 +-
 .../monitoring/PhoenixTableLevelMetricsIT.java     |  46 ++
 .../org/apache/phoenix/compile/UpsertCompiler.java |   6 -
 .../coprocessor/GlobalIndexRegionScanner.java      |  18 +-
 .../org/apache/phoenix/execute/MutationState.java  |  25 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   | 624 ++++++++++++++++++---
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  10 +
 .../org/apache/phoenix/monitoring/MetricType.java  |   6 +
 .../phoenix/monitoring/MutationMetricQueue.java    |  13 +-
 .../phoenix/monitoring/TableClientMetrics.java     |   8 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |   2 +-
 .../apache/phoenix/compile/QueryCompilerTest.java  |  15 -
 16 files changed, 817 insertions(+), 133 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
index fde31e4..0878c07 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
@@ -36,7 +36,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.hadoop.hbase.TableName;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -64,6 +68,12 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
         testCases.add(new String[] {
                 "create local index %s_IDX on %s(counter1, counter2)",
         });
+        testCases.add(new String[] {
+                "create index %s_IDX on %s(counter1) include (counter2)",
+        });
+        testCases.add(new String[] {
+                "create index %s_IDX on %s(counter1, counter2)",
+        });
         return testCases;
     }
     
@@ -498,18 +508,29 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
         exec.shutdownNow();
 
         int finalResult = nThreads * nCommits * nIncrementsPerCommit;
-        //assertEquals(finalResult,resultHolder[0]);
-        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+        boolean isIndexCreated = this.indexDDL != null && this.indexDDL.length() > 0;
+
+        ResultSet rs;
+        String selectSql = "SELECT * FROM " + tableName + " WHERE counter1 >= 0";
+        if (isIndexCreated) {
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            IndexToolIT.assertExplainPlan(this.indexDDL.contains("local"), actualExplainPlan,
+                tableName, tableName + "_IDX");
+        }
+        rs = conn.createStatement().executeQuery(selectSql);
         assertTrue(rs.next());
         assertEquals("a",rs.getString(1));
         assertEquals(finalResult,rs.getInt(2));
         assertFalse(rs.next());
 
-        rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE counter1 >= 0");
-        assertTrue(rs.next());
-        assertEquals("a",rs.getString(1));
-        assertEquals(finalResult,rs.getInt(2));
-        assertFalse(rs.next());
+        if (isIndexCreated) {
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE counter1 >= 0");
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals(finalResult, rs.getInt(2));
+            assertFalse(rs.next());
+        }
         
         conn.close();
     }
@@ -648,6 +669,82 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testMultiplePartialUpdatesInSameBatch() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+            String dml;
+            ResultSet rs;
+            // first commit
+            dml = String.format("UPSERT INTO %s VALUES('a',0,0)", tableName);
+            conn.createStatement().execute(dml);
+            conn.commit();
+            // batch multiple conditional updates (partial) in a single batch
+            dml = String.format(
+                "UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1", tableName);
+            conn.createStatement().execute(dml);
+            dml = String.format(
+                "UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter2 = counter2 + 2", tableName);
+            conn.createStatement().execute(dml);
+            dml = String.format(
+                "UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter1 = counter1 + 100", tableName);
+            conn.createStatement().execute(dml);
+            dml = String.format(
+                "UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter2 = counter2 + 200", tableName);
+            conn.createStatement().execute(dml);
+            conn.commit();
+            String dql = String.format("SELECT counter1, counter2 FROM %s WHERE counter1 > 0", tableName);
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals(101, rs.getInt(1));
+            assertEquals(202, rs.getInt(2));
+        }
+    }
+
+    @Test
+    public void testComplexDuplicateKeyExpression() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName +
+                "(pk varchar primary key, counter1 bigint, counter2 bigint, approval varchar)";
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+            String dml;
+            dml = String.format("UPSERT INTO %s VALUES('abc', 0, 100, 'NONE')", tableName);
+            conn.createStatement().execute(dml);
+            conn.commit();
+            dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES ('abc', 0, 10) " +
+                "ON DUPLICATE KEY UPDATE " +
+                "counter1 = counter1 + counter2," +
+                "approval = CASE WHEN counter1 < 100 THEN 'NONE' " +
+                "WHEN counter1 < 1000 THEN 'MANAGER_APPROVAL' " +
+                "ELSE 'VP_APPROVAL' END", tableName);
+            conn.createStatement().execute(dml);
+            conn.commit();
+            String dql = "SELECT * from " + tableName;
+            ResultSet rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(100, rs.getInt("counter1"));
+            assertEquals(100, rs.getInt("counter2"));
+            assertEquals("NONE", rs.getString("approval"));
+
+            conn.createStatement().execute(dml);
+            conn.commit();
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals("abc", rs.getString("pk"));
+            assertEquals(200, rs.getInt("counter1"));
+            assertEquals(100, rs.getInt("counter2"));
+            assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
+        }
+    }
+
     private void assertRow(Connection conn, String tableName, String expectedPK, int expectedCol1, String expectedCol2) throws SQLException {
         ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
         assertTrue(rs.next());
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
index e4618c7..77cbde0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
@@ -456,6 +456,40 @@ public class WALAnnotationIT extends BaseUniqueNamesOwnClusterIT {
         tenantViewHelper(true);
     }
 
+    @Test
+    public void testOnDuplicateUpsertWithIndex() throws Exception {
+        Assume.assumeFalse(this.isImmutable); // on duplicate is not supported for immutable tables
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        SchemaBuilder builder = new SchemaBuilder(getUrl());
+        try (Connection conn = getConnection()) {
+            SchemaBuilder.TableOptions tableOptions = getTableOptions();
+            builder.withTableOptions(tableOptions).withTableIndexDefaults().build();
+            PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
+            assertEquals("Change Detection Enabled is false!", true, table.isChangeDetectionEnabled());
+            Long ddlTimestamp = table.getLastDDLTimestamp();
+            String upsertSql = "UPSERT INTO " + builder.getEntityTableName() + " VALUES" +
+                " ('a', 'b', 'c', 'd')";
+            conn.createStatement().execute(upsertSql);
+            conn.commit();
+            List<String> columns = builder.getTableOptions().getTableColumns();
+            assertTrue(columns.size() >= 2);
+            String col1 = columns.get(0);
+            String col2 = columns.get(1);
+            // col1 = col1 || col1, col2 = null
+            String onDupClause = String.format("%s = %s || %s, %s = null", col1, col1, col1, col2);
+            // this will result in one Put and one Delete (because of null) mutation
+            upsertSql = "UPSERT INTO " + builder.getEntityTableName() + " VALUES" +
+                " ('a', 'b', 'c', 'd') ON DUPLICATE KEY UPDATE " + onDupClause;
+            conn.createStatement().execute(upsertSql);
+            conn.commit();
+            assertAnnotation(2, builder.getPhysicalTableName(false), null,
+                builder.getTableOptions().getSchemaName(),
+                builder.getDataOptions().getTableName(), PTableType.TABLE, ddlTimestamp);
+            assertAnnotation(0, builder.getPhysicalTableIndexName(false),
+                null, null, null, null, ddlTimestamp);
+        }
+    }
+
     private List<Map<String, byte[]>> getEntriesForTable(TableName tableName) throws IOException {
         AnnotatedWALObserver c = getTestCoprocessor(tableName);
         List<Map<String, byte[]>> entries = c.getWalAnnotationsByTable(tableName);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 47eea6d..7668d55 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -1022,6 +1022,34 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    @Test
+    public void testOnDuplicateKeyWithIndex() throws Exception {
+        if (async || encoded) { // run only once with single cell encoding enabled
+            return;
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexTableName = generateUniqueName();
+            populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1) include (val2, val3)" + this.indexDDLOptions);
+            conn.commit();
+            String upsertSql = "UPSERT INTO " + dataTableName + " VALUES ('a') ON DUPLICATE KEY UPDATE " +
+                "val1 = val1 || val1, val2 = val2 || val2";
+            conn.createStatement().execute(upsertSql);
+            conn.commit();
+            String selectSql = "SELECT * from " + dataTableName + " WHERE val1 = 'abab'";
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("abab", rs.getString(2));
+            assertEquals("abcabc", rs.getString(3));
+            assertEquals("abcd", rs.getString(4));
+            assertFalse(rs.next());
+        }
+    }
+
     static private void commitWithException(Connection conn) {
         try {
             conn.commit();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
index 7c58945..45b3561 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
@@ -113,7 +113,7 @@ public class BasePhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             String t = entry.getKey();
             assertEquals("Table names didn't match!", tableName, t);
             Map<MetricType, Long> p = entry.getValue();
-            assertEquals("There should have been fifteen metrics", 15, p.size());
+            assertEquals("There should have been sixteen metrics", 16, p.size());
             boolean mutationBatchSizePresent = false;
             boolean mutationCommitTimePresent = false;
             boolean mutationBytesPresent = false;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index dc27dee..32d2f67 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -487,7 +487,7 @@ public class PhoenixMetricsIT extends BasePhoenixMetricsIT {
             String t = entry.getKey();
             assertEquals("Table names didn't match!", tableName, t);
             Map<MetricType, Long> p = entry.getValue();
-            assertEquals("There should have been five metrics", 15, p.size());
+            assertEquals("There should have been sixteen metrics", 16, p.size());
             boolean mutationBatchSizePresent = false;
             boolean mutationCommitTimePresent = false;
             boolean mutationBytesPresent = false;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index b1215cd..2c1aaf9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -56,6 +56,8 @@ import org.junit.experimental.categories.Category;
 import static org.apache.phoenix.exception.SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY;
 import static org.apache.phoenix.exception.SQLExceptionCode.GET_TABLE_REGIONS_FAIL;
 import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
@@ -1148,6 +1150,50 @@ public class PhoenixTableLevelMetricsIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    @Test public void testTableLevelMetricsForAtomicUpserts() throws Throwable {
+        String tableName = generateUniqueName();
+        Connection conn = null;
+        Throwable exception = null;
+        int numAtomicUpserts = 4;
+        try {
+            conn = getConnFromTestDriver();
+            String ddl = "create table " + tableName + "(pk varchar primary key, counter1 bigint)";
+            conn.createStatement().execute(ddl);
+            String dml;
+            ResultSet rs;
+            dml = String.format("UPSERT INTO %s VALUES('a', 0)", tableName);
+            conn.createStatement().execute(dml);
+            dml = String.format("UPSERT INTO %s VALUES('a', 0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1", tableName);
+            for (int i = 0; i < numAtomicUpserts; ++i) {
+                conn.createStatement().execute(dml);
+            }
+            conn.commit();
+            String dql = String.format("SELECT counter1 FROM %s WHERE counter1 > 0", tableName);
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals(4, rs.getInt(1));
+        }catch (Throwable t) {
+            exception = t;
+        } finally {
+            // Otherwise the test fails with an error from assertions below instead of the real exception
+            if (exception != null) {
+                throw exception;
+            }
+            assertNotNull("Failed to get a connection!", conn);
+            // Get write metrics before closing the connection since that clears those metrics
+            Map<MetricType, Long>
+                writeMutMetrics =
+                getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
+            conn.close();
+            // 1 regular upsert + numAtomicUpserts
+            // 2 mutations (regular and atomic on the same row in the same batch will be split)
+            assertMutationTableMetrics(true, tableName, 1 + numAtomicUpserts, 0, 0, true, 2, 0, 0, 2, 0,
+                writeMutMetrics, conn);
+            assertEquals(numAtomicUpserts, getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_SQL_COUNTER));
+            assertTrue(getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_COMMIT_TIME) > 0);
+        }
+    }
+
     private Connection getConnFromTestDriver() throws SQLException {
         Connection conn = DriverManager.getConnection(url);
         assertTrue(conn.unwrap(PhoenixConnection.class)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index f1aa249..075c537 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -864,12 +864,6 @@ public class UpsertCompiler {
                 .setTableName(table.getTableName().getString())
                 .build().buildException();
             }
-            if (SchemaUtil.hasGlobalIndex(table)) {
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_WITH_GLOBAL_IDX)
-                .setSchemaName(table.getSchemaName().getString())
-                .setTableName(table.getTableName().getString())
-                .build().buildException();
-            }
             if (onDupKeyPairs.isEmpty()) { // ON DUPLICATE KEY IGNORE
                 onDupKeyBytesToBe = PhoenixIndexBuilder.serializeOnDupKeyIgnore();
             } else {                       // ON DUPLICATE KEY UPDATE;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 6a10edc..fbff011 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -256,26 +256,30 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
         }
         @Override
         public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {
-            List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
-            if (cellList == null || cellList.isEmpty()) {
+            Cell cell = getLatestCell(ref, ts);
+            if (cell == null) {
                 return null;
             }
-            Cell cell = cellList.get(0);
             valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
             return valuePtr;
         }
-        @Override
-        public KeyValue getLatestKeyValue(ColumnReference ref, long ts) {
+        public Cell getLatestCell(ColumnReference ref, long ts) {
             List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
             if (cellList == null || cellList.isEmpty()) {
                 return null;
             }
-            Cell cell = cellList.get(0);
-            return new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+            return cellList.get(0);
+        }
+        @Override
+        public KeyValue getLatestKeyValue(ColumnReference ref, long ts) {
+            Cell cell = getLatestCell(ref, ts);
+            KeyValue kv = cell == null ? null :
+                new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
                     cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
                     cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
                     cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
                     cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+            return kv;
         }
         @Override
         public byte[] getRowKey() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1efbcf0..68c0d02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -955,6 +955,7 @@ public class MutationState implements SQLCloseable {
         long tempSize;
         long deleteSize = 0, deleteCounter = 0;
         long upsertsize = 0, upsertCounter = 0;
+        long atomicUpsertsize = 0;
         if (GlobalClientMetrics.isMetricsEnabled()) {
             for (Mutation mutation : mutations) {
                 tempSize  = KeyValueUtil.calculateMutationDiskSize(mutation);
@@ -966,6 +967,9 @@ public class MutationState implements SQLCloseable {
                 }else if(mutation instanceof Put) {
                     upsertsize += tempSize;
                     upsertCounter++;
+                    if (mutation.getAttribute(PhoenixIndexBuilder.ATOMIC_OP_ATTRIB) != null) {
+                        atomicUpsertsize += tempSize;
+                    }
                     allDeletesMutations = false;
                 } else {
                     allUpsertsMutations = false;
@@ -976,7 +980,7 @@ public class MutationState implements SQLCloseable {
         if (updateGlobalClientMetrics) {
             GLOBAL_MUTATION_BYTES.update(byteSize);
         }
-        return new MutationBytes(deleteCounter, deleteSize, byteSize, upsertCounter, upsertsize);
+        return new MutationBytes(deleteCounter, deleteSize, byteSize, upsertCounter, upsertsize, atomicUpsertsize);
     }
 
     public long getBatchSizeBytes() {
@@ -994,14 +998,16 @@ public class MutationState implements SQLCloseable {
         private long totalMutationBytes;
         private long upsertMutationCounter;
         private long upsertMutationBytes;
+        private long atomicUpsertMutationBytes; // needed to calculate atomic upsert commit time
 
-        public MutationBytes(long deleteMutationCounter, long deleteMutationBytes, long totalMutationBytes, long
-                upsertMutationCounter, long upsertMutationBytes) {
+        public MutationBytes(long deleteMutationCounter, long deleteMutationBytes, long totalMutationBytes,
+                             long upsertMutationCounter, long upsertMutationBytes, long atomicUpsertMutationBytes) {
             this.deleteMutationCounter = deleteMutationCounter;
             this.deleteMutationBytes = deleteMutationBytes;
             this.totalMutationBytes = totalMutationBytes;
             this.upsertMutationCounter = upsertMutationCounter;
             this.upsertMutationBytes = upsertMutationBytes;
+            this.atomicUpsertMutationBytes = atomicUpsertMutationBytes;
         }
 
 
@@ -1024,6 +1030,8 @@ public class MutationState implements SQLCloseable {
         public long getUpsertMutationBytes() {
             return upsertMutationBytes;
         }
+
+        public long getAtomicUpsertMutationBytes() { return atomicUpsertMutationBytes; }
     }
 
     public enum MutationMetadataType {
@@ -1542,7 +1550,7 @@ public class MutationState implements SQLCloseable {
         // in case we are dealing with all deletes for a non-transactional table, since there is a
         // bug in sendMutations where we don't get the correct value for numFailedMutations when
         // we don't use transactions
-        return new MutationMetricQueue.MutationMetric(0, 0, 0, 0, 0,
+        return new MutationMetricQueue.MutationMetric(0, 0, 0, 0, 0, 0,
                 allDeletesMutations && !isTransactional ? numDeleteMutationsInBatch : numFailedMutations,
                 0, 0, 0, 0,
                 numUpsertMutationsInBatch,
@@ -1571,6 +1579,8 @@ public class MutationState implements SQLCloseable {
             long numFailedPhase3Mutations, long mutationCommitTime) {
         long committedUpsertMutationBytes = totalMutationBytesObject == null ? 0 :
                 totalMutationBytesObject.getUpsertMutationBytes();
+        long committedAtomicUpsertMutationBytes = totalMutationBytesObject == null ? 0:
+                totalMutationBytesObject.getAtomicUpsertMutationBytes();
         long committedDeleteMutationBytes = totalMutationBytesObject == null ? 0 :
                 totalMutationBytesObject.getDeleteMutationBytes();
         long committedUpsertMutationCounter = totalMutationBytesObject == null ? 0 :
@@ -1580,6 +1590,7 @@ public class MutationState implements SQLCloseable {
         long committedTotalMutationBytes = totalMutationBytesObject == null ? 0 :
                 totalMutationBytesObject.getTotalMutationBytes();
         long upsertMutationCommitTime = 0L;
+        long atomicUpsertMutationCommitTime = 0L;
         long deleteMutationCommitTime = 0L;
 
         if (totalMutationBytesObject != null && numFailedMutations != 0) {
@@ -1592,6 +1603,8 @@ public class MutationState implements SQLCloseable {
                     calculateMutationSize(uncommittedMutationsList, false);
             committedUpsertMutationBytes -=
                     uncommittedMutationBytesObject.getUpsertMutationBytes();
+            committedAtomicUpsertMutationBytes -=
+                    uncommittedMutationBytesObject.getAtomicUpsertMutationBytes();
             committedDeleteMutationBytes -=
                     uncommittedMutationBytesObject.getDeleteMutationBytes();
             committedUpsertMutationCounter -=
@@ -1606,6 +1619,9 @@ public class MutationState implements SQLCloseable {
             upsertMutationCommitTime =
                     (long)Math.floor((double)(committedUpsertMutationBytes * mutationCommitTime)/
                             committedTotalMutationBytes);
+            atomicUpsertMutationCommitTime =
+                (long)Math.floor((double)(committedAtomicUpsertMutationBytes * mutationCommitTime)/
+                            committedTotalMutationBytes);
             deleteMutationCommitTime =
                     (long)Math.ceil((double)(committedDeleteMutationBytes * mutationCommitTime)/
                             committedTotalMutationBytes);
@@ -1614,6 +1630,7 @@ public class MutationState implements SQLCloseable {
                 committedUpsertMutationBytes,
                 committedDeleteMutationBytes,
                 upsertMutationCommitTime,
+                atomicUpsertMutationCommitTime,
                 deleteMutationCommitTime,
                 0, // num failed mutations have been counted already in updateMutationBatchFailureMetrics()
                 committedUpsertMutationCounter,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index e1294f0..96b4f18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -17,21 +17,10 @@
  */
 package org.apache.phoenix.hbase.index;
 
-import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -41,6 +30,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -51,6 +41,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
@@ -59,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
@@ -67,11 +59,20 @@ import org.apache.phoenix.compat.hbase.coprocessor.CompatIndexRegionObserver;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
 import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
+import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.SimpleValueGetter;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.LockManager.RowLock;
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
 import org.apache.phoenix.hbase.index.builder.IndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
@@ -80,13 +81,20 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -96,12 +104,28 @@ import org.apache.phoenix.util.WALAnnotationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
 import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
+import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
+import static org.apache.phoenix.index.PhoenixIndexBuilder.ATOMIC_OP_ATTRIB;
 
 /**
  * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
@@ -467,7 +491,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
               continue;
           }
           Mutation m = miniBatchOp.getOperation(i);
-          if (this.builder.isEnabled(m)) {
+          if (this.builder.isAtomicOp(m) || this.builder.isEnabled(m)) {
               ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
               if (!context.rowsToLock.contains(row)) {
                   context.rowsToLock.add(row);
@@ -476,6 +500,53 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
       }
   }
 
+    /**
+     * Add the mutations generated by the ON DUPLICATE KEY UPDATE to the current batch.
+     * MiniBatchOperationInProgress#addOperationsFromCP() allows coprocessors to attach additional mutations
+     * to the incoming mutation. These additional mutations are only executed if the status of the original
+     * mutation is set to NOT_RUN. For atomic mutations, we want HBase to ignore the incoming mutation and
+     * instead execute the mutations generated by the server for that atomic mutation. But we can’t achieve
+     * this behavior just by setting the status of the original mutation to IGNORE because that will also
+     * ignore the additional mutations added by the coprocessors. To get around this, we need to do a fixup
+     * of the original mutation in the batch. Since we always generate one Put mutation from the incoming atomic
+     * Put mutation, we can transfer the cells from the generated Put mutation to the original atomic Put mutation in the batch.
+     * The additional mutations (Delete) can then be added to the operationsFromCoprocessors array.
+     */
+  private void addOnDupMutationsToBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                          int index, List<Mutation> mutations) {
+      List<Delete> deleteMutations = Lists.newArrayListWithExpectedSize(mutations.size());
+      for (Mutation m : mutations) {
+          if (m instanceof Put) {
+              // fix the incoming atomic mutation
+              Mutation original = miniBatchOp.getOperation(index);
+              original.getFamilyCellMap().putAll(m.getFamilyCellMap());
+          } else if (m instanceof Delete) {
+              deleteMutations.add((Delete)m);
+          }
+      }
+
+      if (!deleteMutations.isEmpty()) {
+          miniBatchOp.addOperationsFromCP(index,
+              deleteMutations.toArray(new Mutation[deleteMutations.size()]));
+      }
+  }
+
+    private void addOnDupMutationsToBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                        BatchMutateContext context) throws IOException {
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          Mutation m = miniBatchOp.getOperation(i);
+          if (this.builder.isAtomicOp(m) && m instanceof Put) {
+              List<Mutation> mutations = generateOnDupMutations(context, (Put)m);
+              if (!mutations.isEmpty()) {
+                  addOnDupMutationsToBatch(miniBatchOp, i, mutations);
+              } else {
+                  // empty list of generated mutations implies ON DUPLICATE KEY IGNORE
+                  miniBatchOp.setOperationStatus(i, IGNORE);
+              }
+          }
+      }
+  }
+
   private void lockRows(BatchMutateContext context) throws IOException {
       for (ImmutableBytesPtr rowKey : context.rowsToLock) {
           context.rowLocks.add(lockManager.lockRow(rowKey, rowLockWaitDuration));
@@ -520,29 +591,48 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
                     context.multiMutationMap.put(row, stored);
                 }
                 stored.addAll(m);
+                Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(i);
+                if (mutationsAddedByCP != null) {
+                    for (Mutation addedMutation : mutationsAddedByCP) {
+                        stored.addAll(addedMutation);
+                    }
+                }
             }
         }
         return context.multiMutationMap.values();
     }
 
-    public static void setTimestamps(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexBuildManager builder, long ts) throws IOException {
+    public static void setTimestamps(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                     IndexBuildManager builder, long ts) throws IOException {
         for (Integer i = 0; i < miniBatchOp.size(); i++) {
             if (miniBatchOp.getOperationStatus(i) == IGNORE) {
                 continue;
             }
             Mutation m = miniBatchOp.getOperation(i);
-            // skip this mutation if we aren't enabling indexing
-            if (!builder.isEnabled(m)) {
+            // skip this mutation if we aren't enabling indexing or not an atomic op
+            if (!builder.isEnabled(m) && !builder.isAtomicOp(m)) {
                 continue;
             }
-            for (List<Cell> cells : m.getFamilyCellMap().values()) {
-                for (Cell cell : cells) {
-                    CellUtil.setTimestamp(cell, ts);
+            setTimestampOnMutation(m, ts);
+
+            // set the timestamps on any additional mutations added
+            Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(i);
+            if (mutationsAddedByCP != null) {
+                for (Mutation addedMutation : mutationsAddedByCP) {
+                    setTimestampOnMutation(addedMutation, ts);
                 }
             }
         }
     }
 
+    private static void setTimestampOnMutation(Mutation m, long ts) throws IOException {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                CellUtil.setTimestamp(cell, ts);
+            }
+        }
+    }
+
     /**
      * This method applies pending delete mutations on the next row states
      */
@@ -559,38 +649,51 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
             if (!(m instanceof Delete)) {
                 continue;
             }
-            ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
-            Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
-            if (dataRowState == null) {
-                dataRowState = new Pair<Put, Put>(null, null);
-                context.dataRowStates.put(rowKeyPtr, dataRowState);
+
+            if (!applyOnePendingDeleteMutation(context, (Delete) m)) {
+                miniBatchOp.setOperationStatus(i, NOWRITE);
             }
-            Put nextDataRowState = dataRowState.getSecond();
-            if (nextDataRowState == null) {
-                if (dataRowState.getFirst() == null) {
-                    // This is a delete row mutation on a non-existing row. There is no need to apply this mutation
-                    // on the data table
-                    miniBatchOp.setOperationStatus(i, NOWRITE);
-                }
-                continue;
+        }
+    }
+
+    /**
+     * This method returns true if the pending delete mutation needs to be applied
+     * and false f the delete mutation can be ignored for example in the case of
+     * delete on non-existing row.
+     */
+    private boolean applyOnePendingDeleteMutation(BatchMutateContext context, Delete delete) {
+        ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(delete.getRow());
+        Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+        if (dataRowState == null) {
+            dataRowState = new Pair<Put, Put>(null, null);
+            context.dataRowStates.put(rowKeyPtr, dataRowState);
+        }
+        Put nextDataRowState = dataRowState.getSecond();
+        if (nextDataRowState == null) {
+            if (dataRowState.getFirst() == null) {
+                // This is a delete row mutation on a non-existing row. There is no need to apply this mutation
+                // on the data table
+                return false;
             }
-            for (List<Cell> cells : m.getFamilyCellMap().values()) {
-                for (Cell cell : cells) {
-                    switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
-                        case DeleteFamily:
-                        case DeleteFamilyVersion:
-                            nextDataRowState.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
-                            break;
-                        case DeleteColumn:
-                        case Delete:
-                            removeColumn(nextDataRowState, cell);
-                    }
+        }
+
+        for (List<Cell> cells : delete.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
+                    case DeleteFamily:
+                    case DeleteFamilyVersion:
+                        nextDataRowState.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                        break;
+                    case DeleteColumn:
+                    case Delete:
+                        removeColumn(nextDataRowState, cell);
                 }
             }
-            if (nextDataRowState != null && nextDataRowState.getFamilyCellMap().size() == 0) {
-                dataRowState.setSecond(null);
-            }
         }
+        if (nextDataRowState != null && nextDataRowState.getFamilyCellMap().size() == 0) {
+            dataRowState.setSecond(null);
+        }
+        return true;
     }
 
     /**
@@ -608,21 +711,32 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
             if (!this.builder.isEnabled(m)) {
                 continue;
             }
-            if (m instanceof Put) {
-                ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
-                Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
-                if (dataRowState == null) {
-                    dataRowState = new Pair<Put, Put>(null, null);
-                    context.dataRowStates.put(rowKeyPtr, dataRowState);
+
+            if (!(m instanceof Put)) {
+                continue;
+            }
+
+            ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+            Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+            if (dataRowState == null) {
+                dataRowState = new Pair<Put, Put>(null, null);
+                context.dataRowStates.put(rowKeyPtr, dataRowState);
+            }
+            Put nextDataRowState = dataRowState.getSecond();
+            dataRowState.setSecond((nextDataRowState != null) ? applyNew((Put) m, nextDataRowState) : new Put((Put) m));
+
+            Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(i);
+            if (mutationsAddedByCP != null) {
+                // all added mutations are of type delete corresponding to set nulls
+                for (Mutation addedMutation : mutationsAddedByCP) {
+                    applyOnePendingDeleteMutation(context, (Delete)addedMutation);
                 }
-                Put nextDataRowState = dataRowState.getSecond();
-                dataRowState.setSecond((nextDataRowState != null) ? applyNew((Put) m, nextDataRowState) : new Put((Put) m));
             }
         }
     }
 
     /**
-     * * Prepares data row current and next row states
+     * * Prepares next data row state
      */
     private void prepareDataRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
                                       MiniBatchOperationInProgress<Mutation> miniBatchOp,
@@ -631,8 +745,6 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
         if (context.rowsToLock.size() == 0) {
             return;
         }
-        // Retrieve the current row states from the data table
-        getCurrentRowStates(c, context);
         applyPendingPutMutations(miniBatchOp, context, now);
         applyPendingDeleteMutations(miniBatchOp, context);
     }
@@ -677,6 +789,10 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
             localUpdates.add(next.getFirst());
         }
         if (!localUpdates.isEmpty()) {
+            Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(0);
+            if (mutationsAddedByCP != null) {
+                localUpdates.addAll(Arrays.asList(mutationsAddedByCP));
+            }
             miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
         }
     }
@@ -885,8 +1001,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
                 }
             }
         }
-        removePendingRows(context);
-        context.indexUpdates.clear();
+        // all cleanup will be done in postBatchMutateIndispensably()
     }
 
     private static boolean hasGlobalIndex(PhoenixIndexMetaData indexMetaData) {
@@ -907,6 +1022,19 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
         return false;
     }
 
+    private boolean hasAtomicUpdate(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            if (this.builder.isAtomicOp(m)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context)
             throws Throwable {
         boolean done;
@@ -914,10 +1042,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
         done = true;
         for (BatchMutateContext lastContext : context.lastConcurrentBatchContext.values()) {
             phase = lastContext.getCurrentPhase();
-            if (phase == BatchMutatePhase.FAILED) {
-                done = false;
-                break;
-            }
+
             if (phase == BatchMutatePhase.PRE) {
                 CountDownLatch countDownLatch = lastContext.getCountDownLatch();
                 // Release the locks so that the previous concurrent mutation can go into the post phase
@@ -926,23 +1051,30 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
                 // lastContext.getMaxPendingRowCount() is the depth of the subtree rooted at the batch pointed by lastContext
                 if (!countDownLatch.await((lastContext.getMaxPendingRowCount() + 1) * concurrentMutationWaitDuration,
                         TimeUnit.MILLISECONDS)) {
+                    LOG.debug(String.format("latch timeout context %s last %s", context, lastContext));
                     done = false;
-                    break;
                 }
                 // Acquire the locks again before letting the region proceed with data table updates
                 lockRows(context);
+                if (!done) {
+                    // previous concurrent batch did not complete so we have to retry this batch
+                    break;
+                } else {
+                    // read the phase again to determine the status of previous batch
+                    phase = lastContext.getCurrentPhase();
+                    LOG.debug(String.format("context %s last %s exit phase %s", context, lastContext, phase));
+                }
+            }
+
+            if (phase == BatchMutatePhase.FAILED) {
+                done = false;
+                break;
             }
         }
         if (!done) {
             // This batch needs to be retried since one of the previous concurrent batches has not completed yet.
-            // Throwing an IOException will result in retries of this batch. Before throwing exception,
-            // we need to remove reference counts and locks for the rows of this batch
-            removePendingRows(context);
-            context.indexUpdates.clear();
-            for (RowLock rowLock : context.rowLocks) {
-                rowLock.release();
-            }
-            context.rowLocks.clear();
+            // Throwing an IOException will result in retries of this batch. Removal of reference counts and
+            // locks for the rows of this batch will be done in postBatchMutateIndispensably()
             throw new IOException("One of the previous concurrent mutations has not completed. " +
                     "The batch needs to be retried " + table.getNameAsString());
         }
@@ -950,25 +1082,52 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
 
     public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
                                              MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
-        ignoreAtomicOperations(miniBatchOp);
         PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
         BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
         setBatchMutateContext(c, context);
         context.populateOriginalMutations(miniBatchOp);
+
         // Need to add cell tags to Delete Marker before we do any index processing
         // since we add tags to tables which doesn't have indexes also.
         IndexUtil.setDeleteAttributes(miniBatchOp);
 
-        /*
-         * Exclusively lock all rows so we get a consistent read
-         * while determining the index updates
-         */
+        // Exclusively lock all rows so we get a consistent read while
+        // determining the index updates
         populateRowsToLock(miniBatchOp, context);
         // early exit if it turns out we don't have any update for indexes
         if (context.rowsToLock.isEmpty()) {
             return;
         }
         lockRows(context);
+
+        boolean hasAtomic = hasAtomicUpdate(miniBatchOp);
+        long onDupCheckTime = 0;
+
+        if (hasAtomic || hasGlobalIndex(indexMetaData)) {
+            // Retrieve the current row states from the data table while holding the lock.
+            // This is needed for both atomic mutations and global indexes
+            long start = EnvironmentEdgeManager.currentTimeMillis();
+            getCurrentRowStates(c, context);
+            onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() - start);
+        }
+
+        if (hasAtomic) {
+            long start = EnvironmentEdgeManager.currentTimeMillis();
+            // add the mutations for conditional updates to the mini batch
+            addOnDupMutationsToBatch(miniBatchOp, context);
+
+            // release locks for ON DUPLICATE KEY IGNORE since we won't be changing those rows
+            // this is needed so that we can exit early
+            releaseLocksForOnDupIgnoreMutations(miniBatchOp, context);
+            onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() - start);
+            metricSource.updateDuplicateKeyCheckTime(dataTableName, onDupCheckTime);
+
+            // early exit if we are not changing any rows
+            if (context.rowsToLock.isEmpty()) {
+                return;
+            }
+        }
+
         long now = EnvironmentEdgeManager.currentTimeMillis();
         // Update the timestamps of the data table mutations to prevent overlapping timestamps (which prevents index
         // inconsistencies as this case isn't handled correctly currently).
@@ -976,7 +1135,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
 
         TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
         if (hasGlobalIndex(indexMetaData)) {
-            // Prepare current and next data rows states for pending mutations (for global indexes)
+            // Prepare next data rows states for pending mutations (for global indexes)
             prepareDataRowStates(c, miniBatchOp, context, now);
             // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
             // concurrent updates
@@ -1014,17 +1173,47 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
         }
     }
 
+    /**
+     * In case of ON DUPLICATE KEY IGNORE, if the row already exists no mutations will be
+     * generated so release the row lock.
+     */
+    private void releaseLocksForOnDupIgnoreMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                     BatchMutateContext context) {
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            // status of all ON DUPLICATE KEY IGNORE mutations is updated to IGNORE
+            if (miniBatchOp.getOperationStatus(i) != IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            if (!this.builder.isAtomicOp(m)) {
+                continue;
+            }
+            ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+            Iterator<RowLock> rowLockIterator = context.rowLocks.iterator();
+            while(rowLockIterator.hasNext()){
+                RowLock rowLock = rowLockIterator.next();
+                ImmutableBytesPtr rowKey = rowLock.getRowKey();
+                if (row.equals(rowKey)) {
+                    rowLock.release();
+                    rowLockIterator.remove();
+                    context.rowsToLock.remove(row);
+                    break;
+                }
+            }
+        }
+    }
+
     private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
       this.batchMutateContext.set(context);
     }
 
-  private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+    private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
       return this.batchMutateContext.get();
-  }
+    }
 
-  private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+    private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
       this.batchMutateContext.remove();
-  }
+    }
 
     @Override
     public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> c, WALKey key,
@@ -1035,6 +1224,11 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
         }
     }
 
+    /**
+     * When this hook is called, all the rows in the batch context are locked. Because the rows
+     * are locked, we can safely make updates to the context object and perform the necessary
+     * cleanup.
+     */
   @Override
   public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
       MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
@@ -1056,6 +1250,10 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
                   countDownLatch.countDown();
               }
           }
+          removePendingRows(context);
+          if (context.indexUpdates != null) {
+              context.indexUpdates.clear();
+          }
           unlockRows(context);
           this.builder.batchCompleted(miniBatchOp);
 
@@ -1138,10 +1336,10 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
           metricSource.updatePreIndexUpdateFailureTime(dataTableName,
               EnvironmentEdgeManager.currentTimeMillis() - start);
           metricSource.incrementPreIndexUpdateFailures(dataTableName);
-          // Remove all locks as they are already unlocked. There is no need to unlock them again later when
-          // postBatchMutateIndispensably() is called
-          removePendingRows(context);
-          context.rowLocks.clear();
+          // Re-acquire all locks since we released them before making index updates
+          // Removal of reference counts and locks for the rows of this batch will be
+          // done in postBatchMutateIndispensably()
+          lockRows(context);
           rethrowIndexingException(e);
       }
       throw new RuntimeException(
@@ -1165,4 +1363,256 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties);
   }
-}
\ No newline at end of file
+
+  private void extractExpressionsAndColumns(DataInputStream input,
+                              List<Pair<PTable, List<Expression>>> operations,
+                              final Set<ColumnReference> colsReadInExpr) throws IOException {
+      while (true) {
+          ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() {
+              @Override
+              public Void visit(KeyValueColumnExpression expression) {
+                  colsReadInExpr.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()));
+                  return null;
+              }
+          };
+          try {
+              int nExpressions = WritableUtils.readVInt(input);
+              List<Expression> expressions = Lists.newArrayListWithExpectedSize(nExpressions);
+              for (int i = 0; i < nExpressions; i++) {
+                  Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+                  expression.readFields(input);
+                  expressions.add(expression);
+                  expression.accept(visitor);
+              }
+              PTableProtos.PTable tableProto = PTableProtos.PTable.parseDelimitedFrom(input);
+              PTable table = PTableImpl.createFromProto(tableProto);
+              operations.add(new Pair<>(table, expressions));
+          } catch (EOFException e) {
+              break;
+          }
+      }
+  }
+
+    /**
+     * This function has been adapted from PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase but in order
+     * to correctly support concurrent index mutations we need to always read the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put mutation and optionally
+     * one Delete mutation (with DeleteColumn type cells for all columns set to null).
+     */
+  private List<Mutation> generateOnDupMutations(BatchMutateContext context, Put atomicPut) throws IOException {
+      List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+      byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB);
+      if (opBytes == null) { // Unexpected
+          return null;
+      }
+      Put put = null;
+      Delete delete = null;
+
+      // mutations returned by this function will have the LATEST timestamp
+      // later these timestamps will be updated by the IndexRegionObserver#setTimestamps() function
+      long ts = HConstants.LATEST_TIMESTAMP;
+
+      byte[] rowKey = atomicPut.getRow();
+      ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(rowKey);
+      // Get the latest data row state
+      Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+      Put currentDataRowState = dataRowState != null ? dataRowState.getFirst() : null;
+
+      if (PhoenixIndexBuilder.isDupKeyIgnore(opBytes)) {
+          if (currentDataRowState == null) {
+              // new row
+              mutations.add(atomicPut);
+          }
+          return mutations;
+      }
+
+      ByteArrayInputStream stream = new ByteArrayInputStream(opBytes);
+      DataInputStream input = new DataInputStream(stream);
+      boolean skipFirstOp = input.readBoolean();
+      short repeat = input.readShort();
+
+      List<Pair<PTable, List<Expression>>> operations = Lists.newArrayListWithExpectedSize(3);
+      final Set<ColumnReference> colsReadInExpr = new HashSet<>();
+      // deserialize the conditional update expressions and
+      // extract the columns that are read in the conditional expressions
+      extractExpressionsAndColumns(input, operations, colsReadInExpr);
+      int estimatedSize = colsReadInExpr.size();
+
+      // initialized to either the incoming new row or the current row
+      // stores the intermediate values as we apply conditional update expressions
+      List<Cell> flattenedCells;
+      // read the column values requested in the get from the current data row
+      List<Cell> cells = readColumnsFromRow(currentDataRowState, colsReadInExpr);
+
+      if (currentDataRowState == null) { // row doesn't exist
+          if (skipFirstOp) {
+              if (operations.size() <= 1 && repeat <= 1) {
+                  // early exit since there is only one ON DUPLICATE KEY UPDATE
+                  // clause which is ignored because the row doesn't exist so
+                  // simply use the values in UPSERT VALUES
+                  mutations.add(atomicPut);
+                  return mutations;
+              }
+              // If there are multiple ON DUPLICATE KEY UPDATE on a new row,
+              // the first one is skipped
+              repeat--;
+          }
+          // Base current state off of new row
+          flattenedCells = flattenCells(atomicPut);
+      } else {
+          // Base current state off of existing row
+          flattenedCells = cells;
+      }
+
+      MultiKeyValueTuple tuple = new MultiKeyValueTuple(flattenedCells);
+      ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+
+      // for each conditional upsert in the batch
+      for (int opIndex = 0; opIndex < operations.size(); opIndex++) {
+          Pair<PTable, List<Expression>> operation = operations.get(opIndex);
+          PTable table = operation.getFirst();
+          List<Expression> expressions = operation.getSecond();
+          for (int j = 0; j < repeat; j++) { // repeater loop
+              ptr.set(rowKey);
+              // Sort the list of cells (if they've been flattened in which case they're
+              // not necessarily ordered correctly).
+              if (flattenedCells != null) {
+                  Collections.sort(flattenedCells, KeyValue.COMPARATOR);
+              }
+              PRow row = table.newRow(GenericKeyValueBuilder.INSTANCE, ts, ptr, false);
+              int adjust = table.getBucketNum() == null ? 1 : 2;
+              for (int i = 0; i < expressions.size(); i++) {
+                  Expression expression = expressions.get(i);
+                  ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                  expression.evaluate(tuple, ptr);
+                  PColumn column = table.getColumns().get(i + adjust);
+                  Object value = expression.getDataType().toObject(ptr, column.getSortOrder());
+                  // We are guaranteed that the two column will have the same type
+                  if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
+                      expression.getSortOrder(), expression.getMaxLength(), expression.getScale(),
+                      column.getMaxLength(), column.getScale())) {
+                      throw new DataExceedsCapacityException(column.getDataType(), column.getMaxLength(),
+                          column.getScale(), column.getName().getString());
+                  }
+                  column.getDataType().coerceBytes(ptr, value, expression.getDataType(), expression.getMaxLength(),
+                      expression.getScale(), expression.getSortOrder(), column.getMaxLength(), column.getScale(),
+                      column.getSortOrder(), table.rowKeyOrderOptimizable());
+                  byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                  row.setValue(column, bytes);
+              }
+              List<Cell> updatedCells = Lists.newArrayListWithExpectedSize(estimatedSize);
+              List<Mutation> newMutations = row.toRowMutations();
+              for (Mutation source : newMutations) {
+                  flattenCells(source, updatedCells);
+              }
+              // update the cells to the latest values calculated above
+              flattenedCells = mergeCells(flattenedCells, updatedCells);
+              tuple.setKeyValues(flattenedCells);
+          }
+          // Repeat only applies to first statement
+          repeat = 1;
+      }
+
+      for (int i = 0; i < tuple.size(); i++) {
+          Cell cell = tuple.getValue(i);
+          if (Type.codeToType(cell.getTypeByte()) == Type.Put) {
+              if (put == null) {
+                  put = new Put(rowKey);
+                  transferAttributes(atomicPut, put);
+                  mutations.add(put);
+              }
+              put.add(cell);
+          } else {
+              if (delete == null) {
+                  delete = new Delete(rowKey);
+                  transferAttributes(atomicPut, delete);
+                  mutations.add(delete);
+              }
+              delete.addDeleteMarker(cell);
+          }
+      }
+      return mutations;
+  }
+
+    private List<Cell> readColumnsFromRow(Put currentDataRow, Set<ColumnReference> cols) {
+        if (currentDataRow == null) {
+            return Collections.EMPTY_LIST;
+        }
+
+        List<Cell> columnValues = Lists.newArrayList();
+
+        // just return any cell FirstKeyOnlyFilter
+        if (cols.isEmpty()) {
+            for (List<Cell> cells : currentDataRow.getFamilyCellMap().values()) {
+                if (cells == null || cells.isEmpty()) {
+                    continue;
+                }
+                columnValues.add(cells.get(0));
+                break;
+            }
+            return columnValues;
+        }
+
+        SimpleValueGetter valueGetter = new SimpleValueGetter(currentDataRow);
+        for (ColumnReference colRef : cols) {
+            Cell cell = valueGetter.getLatestCell(colRef, HConstants.LATEST_TIMESTAMP);
+            if (cell != null) {
+                columnValues.add(cell);
+            }
+        }
+        return columnValues;
+    }
+
+    private static List<Cell> flattenCells(Mutation m) {
+        List<Cell> flattenedCells = new ArrayList<>();
+        flattenCells(m, flattenedCells);
+        return flattenedCells;
+    }
+
+    private static void flattenCells(Mutation m, List<Cell> flattenedCells) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            flattenedCells.addAll(cells);
+        }
+    }
+
+    /**
+     * ensure that the generated mutations have all the attributes like schema
+     */
+    private static void transferAttributes(Mutation source, Mutation target) {
+        for (Map.Entry<String, byte[]> entry : source.getAttributesMap().entrySet()) {
+            target.setAttribute(entry.getKey(), entry.getValue());
+        }
+    }
+
+    /**
+     * First take all the cells that are present in the latest. Then look at current
+     * and any cell not present in latest is taken.
+     */
+    private static List<Cell> mergeCells(List<Cell> current, List<Cell> latest) {
+        Map<ColumnReference, Cell> latestColVals = Maps.newHashMapWithExpectedSize(latest.size() + current.size());
+
+        // first take everything present in latest
+        for (Cell cell : latest) {
+            byte[] family = CellUtil.cloneFamily(cell);
+            byte[] qualifier = CellUtil.cloneQualifier(cell);
+            ColumnReference colInfo = new ColumnReference(family, qualifier);
+            latestColVals.put(colInfo, cell);
+        }
+
+        // check for any leftovers in current
+        for (Cell cell : current) {
+            byte[] family = CellUtil.cloneFamily(cell);
+            byte[] qualifier = CellUtil.cloneQualifier(cell);
+            ColumnReference colInfo = new ColumnReference(family, qualifier);
+            if (!latestColVals.containsKey(colInfo)) {
+                latestColVals.put(colInfo, cell);
+            }
+        }
+        return Lists.newArrayList(latestColVals.values());
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 9d7d5f0..2a76887 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.jdbc;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_FAILED_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_COUNTER;
@@ -511,6 +513,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                             boolean success = false;
                             String tableName = null;
                             boolean isUpsert = false;
+                            boolean isAtomicUpsert = false;
                             boolean isDelete = false;
                             MutationState state = null;
                             MutationPlan plan = null;
@@ -525,6 +528,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                                 plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
                                 isUpsert = stmt instanceof ExecutableUpsertStatement;
                                 isDelete = stmt instanceof ExecutableDeleteStatement;
+                                isAtomicUpsert = isUpsert && ((ExecutableUpsertStatement)stmt).getOnDupKeyPairs() != null;
                                 if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null) {
                                     if(!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString())) {
                                         tableName = plan.getTargetRef().getTable().getPhysicalName().toString();
@@ -596,6 +600,12 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                                                 UPSERT_SQL_COUNTER : DELETE_SQL_COUNTER, 1);
                                         TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
                                                 UPSERT_SQL_QUERY_TIME : DELETE_SQL_QUERY_TIME, executeMutationTimeSpent);
+                                        if (isAtomicUpsert) {
+                                            TableMetricsManager.updateMetricsMethod(tableName,
+                                                ATOMIC_UPSERT_SQL_COUNTER, 1);
+                                            TableMetricsManager.updateMetricsMethod(tableName,
+                                                ATOMIC_UPSERT_SQL_QUERY_TIME, executeMutationTimeSpent);
+                                        }
 
                                         if (success) {
                                             TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index dced4ca..ed7d003 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -56,6 +56,12 @@ public enum MetricType {
             + " autoCommit is true, the total time taken for executeMutation + conn.commit",
             LogLevel.OFF, PLong.INSTANCE),
 
+    ATOMIC_UPSERT_SQL_COUNTER("auc", "Counter for number of atomic upsert sql queries", LogLevel.OFF, PLong.INSTANCE),
+    ATOMIC_UPSERT_COMMIT_TIME("aut", "Time it took to commit a batch of atomic upserts", LogLevel.OFF, PLong.INSTANCE),
+    ATOMIC_UPSERT_SQL_QUERY_TIME("auqt", "Time taken by atomic upsert sql queries inside executeMutation or if"
+        + " autoCommit is true, the total time taken for executeMutation + conn.commit",
+        LogLevel.OFF, PLong.INSTANCE),
+
     // delete-specific metrics updated during executeMutation
     DELETE_SQL_COUNTER("dc", "Counter for number of delete sql queries", LogLevel.OFF, PLong.INSTANCE),
     DELETE_SUCCESS_SQL_COUNTER("dssc", "Counter for number of delete sql queries that successfully"
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index b8444c3..a44483a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.monitoring;
 
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
@@ -82,6 +83,7 @@ public class MutationMetricQueue {
             publishedMetricsForTable.put(metric.getDeleteMutationsSizeBytes().getMetricType(), metric.getDeleteMutationsSizeBytes().getValue());
             publishedMetricsForTable.put(metric.getCommitTimeForMutations().getMetricType(), metric.getCommitTimeForMutations().getValue());
             publishedMetricsForTable.put(metric.getTotalCommitTimeForUpserts().getMetricType(), metric.getTotalCommitTimeForUpserts().getValue());
+            publishedMetricsForTable.put(metric.getTotalCommitTimeForAtomicUpserts().getMetricType(), metric.getTotalCommitTimeForAtomicUpserts().getValue());
             publishedMetricsForTable.put(metric.getTotalCommitTimeForDeletes().getMetricType(), metric.getTotalCommitTimeForDeletes().getValue());
             publishedMetricsForTable.put(metric.getNumFailedMutations().getMetricType(), metric.getNumFailedMutations().getValue());
             publishedMetricsForTable.put(metric.getNumOfIndexCommitFailedMutations().getMetricType(), metric.getNumOfIndexCommitFailedMutations().getValue());
@@ -110,6 +112,7 @@ public class MutationMetricQueue {
         private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
         private final CombinableMetric numFailedMutations = new CombinableMetricImpl(MUTATION_BATCH_FAILED_SIZE);
         private final CombinableMetric totalCommitTimeForUpserts = new CombinableMetricImpl(UPSERT_COMMIT_TIME);
+        private final CombinableMetric totalCommitTimeForAtomicUpserts = new CombinableMetricImpl(ATOMIC_UPSERT_COMMIT_TIME);
         private final CombinableMetric totalCommitTimeForDeletes = new CombinableMetricImpl(DELETE_COMMIT_TIME);
         private final CombinableMetric upsertMutationsSizeBytes = new CombinableMetricImpl(UPSERT_MUTATION_BYTES);
         private final CombinableMetric deleteMutationsSizeBytes = new CombinableMetricImpl(DELETE_MUTATION_BYTES);
@@ -124,17 +127,18 @@ public class MutationMetricQueue {
                 INDEX_COMMIT_FAILURE_SIZE);
 
         public static final MutationMetric EMPTY_METRIC =
-                new MutationMetric(0,0,0,0,0,0,0,0,0,0,0,0,0,0);
+                new MutationMetric(0,0,0,0, 0, 0,0,0,0,0,0,0,0,0,0);
 
         public MutationMetric(long numMutations, long upsertMutationsSizeBytes,
-                long deleteMutationsSizeBytes, long commitTimeForUpserts, long commitTimeForDeletes,
-                long numFailedMutations, long upsertMutationSqlCounterSuccess,
+                long deleteMutationsSizeBytes, long commitTimeForUpserts, long commitTimeForAtomicUpserts,
+                long commitTimeForDeletes, long numFailedMutations, long upsertMutationSqlCounterSuccess,
                 long deleteMutationSqlCounterSuccess, long totalMutationBytes,
                 long numOfPhase3Failed, long upsertBatchFailedSize,
                 long upsertBatchFailedCounter, long deleteBatchFailedSize,
                 long deleteBatchFailedCounter) {
             this.numMutations.change(numMutations);
             this.totalCommitTimeForUpserts.change(commitTimeForUpserts);
+            this.totalCommitTimeForAtomicUpserts.change(commitTimeForAtomicUpserts);
             this.totalCommitTimeForDeletes.change(commitTimeForDeletes);
             this.totalCommitTimeForMutations.change(commitTimeForUpserts + commitTimeForDeletes);
             this.numFailedMutations.change(numFailedMutations);
@@ -154,6 +158,8 @@ public class MutationMetricQueue {
             return totalCommitTimeForUpserts;
         }
 
+        public CombinableMetric getTotalCommitTimeForAtomicUpserts() { return totalCommitTimeForAtomicUpserts; }
+
         public CombinableMetric getTotalCommitTimeForDeletes() {
             return totalCommitTimeForDeletes;
         }
@@ -213,6 +219,7 @@ public class MutationMetricQueue {
         public void combineMetric(MutationMetric other) {
             this.numMutations.combine(other.numMutations);
             this.totalCommitTimeForUpserts.combine(other.totalCommitTimeForUpserts);
+            this.totalCommitTimeForAtomicUpserts.combine(other.totalCommitTimeForAtomicUpserts);
             this.totalCommitTimeForDeletes.combine(other.totalCommitTimeForDeletes);
             this.totalCommitTimeForMutations.combine(other.totalCommitTimeForMutations);
             this.numFailedMutations.combine(other.numFailedMutations);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
index b640f29..13ef856 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
@@ -68,6 +68,9 @@ import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
 
 /**
  * This is used by TableMetricsManager class to store instance of
@@ -121,7 +124,10 @@ public class TableClientMetrics {
                 DELETE_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_DELETE_AGGREGATE_FAILURE_SQL_COUNTER(
                 DELETE_AGGREGATE_FAILURE_SQL_COUNTER), TABLE_SELECT_AGGREGATE_SUCCESS_SQL_COUNTER(
                 SELECT_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_SELECT_AGGREGATE_FAILURE_SQL_COUNTER(
-                SELECT_AGGREGATE_FAILURE_SQL_COUNTER);
+                SELECT_AGGREGATE_FAILURE_SQL_COUNTER),
+                TABLE_ATOMIC_UPSERT_SQL_COUNTER(ATOMIC_UPSERT_SQL_COUNTER),
+                TABLE_ATOMIC_UPSERT_COMMIT_TIME(ATOMIC_UPSERT_COMMIT_TIME),
+                TABLE_ATOMIC_UPSERT_SQL_QUERY_TIME(ATOMIC_UPSERT_SQL_QUERY_TIME);
 
         private final MetricType metricType;
         private PhoenixTableMetric metric;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index cf9518f..ca95c1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -1284,7 +1284,7 @@ public class PTableImpl implements PTable {
         }
 
         private void newMutations() {
-            Mutation put = this.hasOnDupKey ? new Increment(this.key) : new Put(this.key);
+            Mutation put = new Put(this.key);
             Delete delete = new Delete(this.key);
             if (isWALDisabled()) {
                 put.setDurability(Durability.SKIP_WAL);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 06c8fd1..4500a18 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -2851,21 +2851,6 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
     }
 
     @Test
-    public void testOnDupKeyWithGlobalIndex() throws Exception {
-        Connection conn = DriverManager.getConnection(getUrl());
-        try {
-            conn.createStatement().execute("CREATE TABLE t1 (k integer not null primary key, v bigint)");
-            conn.createStatement().execute("CREATE INDEX idx ON t1 (v)");
-            conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1");
-            fail();
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_WITH_GLOBAL_IDX.getErrorCode(), e.getErrorCode());
-        } finally {
-            conn.close();
-        }
-    }
-
-    @Test
     public void testUpdatePKOnDupKey() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
         try {