You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2021/06/07 17:46:19 UTC

[phoenix] branch PHOENIX-6387-4.x updated: PHOENIX-6474 Client and server metrics for atomic upserts (#1237)

This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch PHOENIX-6387-4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6387-4.x by this push:
     new 50160a0  PHOENIX-6474 Client and server metrics for atomic upserts (#1237)
50160a0 is described below

commit 50160a096a0d7dff609bdcd50024a76b70f4f84c
Author: tkhurana <kh...@gmail.com>
AuthorDate: Mon Jun 7 10:46:08 2021 -0700

    PHOENIX-6474 Client and server metrics for atomic upserts (#1237)
    
    * PHOENIX-6474 Client and server metrics for atomic upserts
    
    * Fixed failing tests related to metrics
---
 .../phoenix/monitoring/BasePhoenixMetricsIT.java   |  2 +-
 .../phoenix/monitoring/PhoenixMetricsIT.java       |  2 +-
 .../monitoring/PhoenixTableLevelMetricsIT.java     | 46 ++++++++++++++++++++++
 .../org/apache/phoenix/execute/MutationState.java  | 25 ++++++++++--
 .../phoenix/hbase/index/IndexRegionObserver.java   |  7 ++++
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  | 10 +++++
 .../org/apache/phoenix/monitoring/MetricType.java  |  6 +++
 .../phoenix/monitoring/MutationMetricQueue.java    | 13 ++++--
 .../phoenix/monitoring/TableClientMetrics.java     |  8 +++-
 9 files changed, 109 insertions(+), 10 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
index 7c58945..45b3561 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
@@ -113,7 +113,7 @@ public class BasePhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             String t = entry.getKey();
             assertEquals("Table names didn't match!", tableName, t);
             Map<MetricType, Long> p = entry.getValue();
-            assertEquals("There should have been fifteen metrics", 15, p.size());
+            assertEquals("There should have been sixteen metrics", 16, p.size());
             boolean mutationBatchSizePresent = false;
             boolean mutationCommitTimePresent = false;
             boolean mutationBytesPresent = false;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index dc27dee..32d2f67 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -487,7 +487,7 @@ public class PhoenixMetricsIT extends BasePhoenixMetricsIT {
             String t = entry.getKey();
             assertEquals("Table names didn't match!", tableName, t);
             Map<MetricType, Long> p = entry.getValue();
-            assertEquals("There should have been five metrics", 15, p.size());
+            assertEquals("There should have been sixteen metrics", 16, p.size());
             boolean mutationBatchSizePresent = false;
             boolean mutationCommitTimePresent = false;
             boolean mutationBytesPresent = false;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index b1215cd..2c1aaf9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -56,6 +56,8 @@ import org.junit.experimental.categories.Category;
 import static org.apache.phoenix.exception.SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY;
 import static org.apache.phoenix.exception.SQLExceptionCode.GET_TABLE_REGIONS_FAIL;
 import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
@@ -1148,6 +1150,50 @@ public class PhoenixTableLevelMetricsIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    @Test public void testTableLevelMetricsForAtomicUpserts() throws Throwable {
+        String tableName = generateUniqueName();
+        Connection conn = null;
+        Throwable exception = null;
+        int numAtomicUpserts = 4;
+        try {
+            conn = getConnFromTestDriver();
+            String ddl = "create table " + tableName + "(pk varchar primary key, counter1 bigint)";
+            conn.createStatement().execute(ddl);
+            String dml;
+            ResultSet rs;
+            dml = String.format("UPSERT INTO %s VALUES('a', 0)", tableName);
+            conn.createStatement().execute(dml);
+            dml = String.format("UPSERT INTO %s VALUES('a', 0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1", tableName);
+            for (int i = 0; i < numAtomicUpserts; ++i) {
+                conn.createStatement().execute(dml);
+            }
+            conn.commit();
+            String dql = String.format("SELECT counter1 FROM %s WHERE counter1 > 0", tableName);
+            rs = conn.createStatement().executeQuery(dql);
+            assertTrue(rs.next());
+            assertEquals(4, rs.getInt(1));
+        }catch (Throwable t) {
+            exception = t;
+        } finally {
+            // Otherwise the test fails with an error from assertions below instead of the real exception
+            if (exception != null) {
+                throw exception;
+            }
+            assertNotNull("Failed to get a connection!", conn);
+            // Get write metrics before closing the connection since that clears those metrics
+            Map<MetricType, Long>
+                writeMutMetrics =
+                getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
+            conn.close();
+            // 1 regular upsert + numAtomicUpserts
+            // 2 mutations (regular and atomic on the same row in the same batch will be split)
+            assertMutationTableMetrics(true, tableName, 1 + numAtomicUpserts, 0, 0, true, 2, 0, 0, 2, 0,
+                writeMutMetrics, conn);
+            assertEquals(numAtomicUpserts, getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_SQL_COUNTER));
+            assertTrue(getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_COMMIT_TIME) > 0);
+        }
+    }
+
     private Connection getConnFromTestDriver() throws SQLException {
         Connection conn = DriverManager.getConnection(url);
         assertTrue(conn.unwrap(PhoenixConnection.class)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1efbcf0..68c0d02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -955,6 +955,7 @@ public class MutationState implements SQLCloseable {
         long tempSize;
         long deleteSize = 0, deleteCounter = 0;
         long upsertsize = 0, upsertCounter = 0;
+        long atomicUpsertsize = 0;
         if (GlobalClientMetrics.isMetricsEnabled()) {
             for (Mutation mutation : mutations) {
                 tempSize  = KeyValueUtil.calculateMutationDiskSize(mutation);
@@ -966,6 +967,9 @@ public class MutationState implements SQLCloseable {
                 }else if(mutation instanceof Put) {
                     upsertsize += tempSize;
                     upsertCounter++;
+                    if (mutation.getAttribute(PhoenixIndexBuilder.ATOMIC_OP_ATTRIB) != null) {
+                        atomicUpsertsize += tempSize;
+                    }
                     allDeletesMutations = false;
                 } else {
                     allUpsertsMutations = false;
@@ -976,7 +980,7 @@ public class MutationState implements SQLCloseable {
         if (updateGlobalClientMetrics) {
             GLOBAL_MUTATION_BYTES.update(byteSize);
         }
-        return new MutationBytes(deleteCounter, deleteSize, byteSize, upsertCounter, upsertsize);
+        return new MutationBytes(deleteCounter, deleteSize, byteSize, upsertCounter, upsertsize, atomicUpsertsize);
     }
 
     public long getBatchSizeBytes() {
@@ -994,14 +998,16 @@ public class MutationState implements SQLCloseable {
         private long totalMutationBytes;
         private long upsertMutationCounter;
         private long upsertMutationBytes;
+        private long atomicUpsertMutationBytes; // needed to calculate atomic upsert commit time
 
-        public MutationBytes(long deleteMutationCounter, long deleteMutationBytes, long totalMutationBytes, long
-                upsertMutationCounter, long upsertMutationBytes) {
+        public MutationBytes(long deleteMutationCounter, long deleteMutationBytes, long totalMutationBytes,
+                             long upsertMutationCounter, long upsertMutationBytes, long atomicUpsertMutationBytes) {
             this.deleteMutationCounter = deleteMutationCounter;
             this.deleteMutationBytes = deleteMutationBytes;
             this.totalMutationBytes = totalMutationBytes;
             this.upsertMutationCounter = upsertMutationCounter;
             this.upsertMutationBytes = upsertMutationBytes;
+            this.atomicUpsertMutationBytes = atomicUpsertMutationBytes;
         }
 
 
@@ -1024,6 +1030,8 @@ public class MutationState implements SQLCloseable {
         public long getUpsertMutationBytes() {
             return upsertMutationBytes;
         }
+
+        public long getAtomicUpsertMutationBytes() { return atomicUpsertMutationBytes; }
     }
 
     public enum MutationMetadataType {
@@ -1542,7 +1550,7 @@ public class MutationState implements SQLCloseable {
         // in case we are dealing with all deletes for a non-transactional table, since there is a
         // bug in sendMutations where we don't get the correct value for numFailedMutations when
         // we don't use transactions
-        return new MutationMetricQueue.MutationMetric(0, 0, 0, 0, 0,
+        return new MutationMetricQueue.MutationMetric(0, 0, 0, 0, 0, 0,
                 allDeletesMutations && !isTransactional ? numDeleteMutationsInBatch : numFailedMutations,
                 0, 0, 0, 0,
                 numUpsertMutationsInBatch,
@@ -1571,6 +1579,8 @@ public class MutationState implements SQLCloseable {
             long numFailedPhase3Mutations, long mutationCommitTime) {
         long committedUpsertMutationBytes = totalMutationBytesObject == null ? 0 :
                 totalMutationBytesObject.getUpsertMutationBytes();
+        long committedAtomicUpsertMutationBytes = totalMutationBytesObject == null ? 0:
+                totalMutationBytesObject.getAtomicUpsertMutationBytes();
         long committedDeleteMutationBytes = totalMutationBytesObject == null ? 0 :
                 totalMutationBytesObject.getDeleteMutationBytes();
         long committedUpsertMutationCounter = totalMutationBytesObject == null ? 0 :
@@ -1580,6 +1590,7 @@ public class MutationState implements SQLCloseable {
         long committedTotalMutationBytes = totalMutationBytesObject == null ? 0 :
                 totalMutationBytesObject.getTotalMutationBytes();
         long upsertMutationCommitTime = 0L;
+        long atomicUpsertMutationCommitTime = 0L;
         long deleteMutationCommitTime = 0L;
 
         if (totalMutationBytesObject != null && numFailedMutations != 0) {
@@ -1592,6 +1603,8 @@ public class MutationState implements SQLCloseable {
                     calculateMutationSize(uncommittedMutationsList, false);
             committedUpsertMutationBytes -=
                     uncommittedMutationBytesObject.getUpsertMutationBytes();
+            committedAtomicUpsertMutationBytes -=
+                    uncommittedMutationBytesObject.getAtomicUpsertMutationBytes();
             committedDeleteMutationBytes -=
                     uncommittedMutationBytesObject.getDeleteMutationBytes();
             committedUpsertMutationCounter -=
@@ -1606,6 +1619,9 @@ public class MutationState implements SQLCloseable {
             upsertMutationCommitTime =
                     (long)Math.floor((double)(committedUpsertMutationBytes * mutationCommitTime)/
                             committedTotalMutationBytes);
+            atomicUpsertMutationCommitTime =
+                (long)Math.floor((double)(committedAtomicUpsertMutationBytes * mutationCommitTime)/
+                            committedTotalMutationBytes);
             deleteMutationCommitTime =
                     (long)Math.ceil((double)(committedDeleteMutationBytes * mutationCommitTime)/
                             committedTotalMutationBytes);
@@ -1614,6 +1630,7 @@ public class MutationState implements SQLCloseable {
                 committedUpsertMutationBytes,
                 committedDeleteMutationBytes,
                 upsertMutationCommitTime,
+                atomicUpsertMutationCommitTime,
                 deleteMutationCommitTime,
                 0, // num failed mutations have been counted already in updateMutationBatchFailureMetrics()
                 committedUpsertMutationCounter,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index a70b39a..96b4f18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -1101,19 +1101,26 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
         lockRows(context);
 
         boolean hasAtomic = hasAtomicUpdate(miniBatchOp);
+        long onDupCheckTime = 0;
+
         if (hasAtomic || hasGlobalIndex(indexMetaData)) {
             // Retrieve the current row states from the data table while holding the lock.
             // This is needed for both atomic mutations and global indexes
+            long start = EnvironmentEdgeManager.currentTimeMillis();
             getCurrentRowStates(c, context);
+            onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() - start);
         }
 
         if (hasAtomic) {
+            long start = EnvironmentEdgeManager.currentTimeMillis();
             // add the mutations for conditional updates to the mini batch
             addOnDupMutationsToBatch(miniBatchOp, context);
 
             // release locks for ON DUPLICATE KEY IGNORE since we won't be changing those rows
             // this is needed so that we can exit early
             releaseLocksForOnDupIgnoreMutations(miniBatchOp, context);
+            onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() - start);
+            metricSource.updateDuplicateKeyCheckTime(dataTableName, onDupCheckTime);
 
             // early exit if we are not changing any rows
             if (context.rowsToLock.isEmpty()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 9d7d5f0..2a76887 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.jdbc;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_FAILED_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_COUNTER;
@@ -511,6 +513,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                             boolean success = false;
                             String tableName = null;
                             boolean isUpsert = false;
+                            boolean isAtomicUpsert = false;
                             boolean isDelete = false;
                             MutationState state = null;
                             MutationPlan plan = null;
@@ -525,6 +528,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                                 plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
                                 isUpsert = stmt instanceof ExecutableUpsertStatement;
                                 isDelete = stmt instanceof ExecutableDeleteStatement;
+                                isAtomicUpsert = isUpsert && ((ExecutableUpsertStatement)stmt).getOnDupKeyPairs() != null;
                                 if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null) {
                                     if(!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString())) {
                                         tableName = plan.getTargetRef().getTable().getPhysicalName().toString();
@@ -596,6 +600,12 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                                                 UPSERT_SQL_COUNTER : DELETE_SQL_COUNTER, 1);
                                         TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
                                                 UPSERT_SQL_QUERY_TIME : DELETE_SQL_QUERY_TIME, executeMutationTimeSpent);
+                                        if (isAtomicUpsert) {
+                                            TableMetricsManager.updateMetricsMethod(tableName,
+                                                ATOMIC_UPSERT_SQL_COUNTER, 1);
+                                            TableMetricsManager.updateMetricsMethod(tableName,
+                                                ATOMIC_UPSERT_SQL_QUERY_TIME, executeMutationTimeSpent);
+                                        }
 
                                         if (success) {
                                             TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index dced4ca..ed7d003 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -56,6 +56,12 @@ public enum MetricType {
             + " autoCommit is true, the total time taken for executeMutation + conn.commit",
             LogLevel.OFF, PLong.INSTANCE),
 
+    ATOMIC_UPSERT_SQL_COUNTER("auc", "Counter for number of atomic upsert sql queries", LogLevel.OFF, PLong.INSTANCE),
+    ATOMIC_UPSERT_COMMIT_TIME("aut", "Time it took to commit a batch of atomic upserts", LogLevel.OFF, PLong.INSTANCE),
+    ATOMIC_UPSERT_SQL_QUERY_TIME("auqt", "Time taken by atomic upsert sql queries inside executeMutation or if"
+        + " autoCommit is true, the total time taken for executeMutation + conn.commit",
+        LogLevel.OFF, PLong.INSTANCE),
+
     // delete-specific metrics updated during executeMutation
     DELETE_SQL_COUNTER("dc", "Counter for number of delete sql queries", LogLevel.OFF, PLong.INSTANCE),
     DELETE_SUCCESS_SQL_COUNTER("dssc", "Counter for number of delete sql queries that successfully"
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index b8444c3..a44483a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.monitoring;
 
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
@@ -82,6 +83,7 @@ public class MutationMetricQueue {
             publishedMetricsForTable.put(metric.getDeleteMutationsSizeBytes().getMetricType(), metric.getDeleteMutationsSizeBytes().getValue());
             publishedMetricsForTable.put(metric.getCommitTimeForMutations().getMetricType(), metric.getCommitTimeForMutations().getValue());
             publishedMetricsForTable.put(metric.getTotalCommitTimeForUpserts().getMetricType(), metric.getTotalCommitTimeForUpserts().getValue());
+            publishedMetricsForTable.put(metric.getTotalCommitTimeForAtomicUpserts().getMetricType(), metric.getTotalCommitTimeForAtomicUpserts().getValue());
             publishedMetricsForTable.put(metric.getTotalCommitTimeForDeletes().getMetricType(), metric.getTotalCommitTimeForDeletes().getValue());
             publishedMetricsForTable.put(metric.getNumFailedMutations().getMetricType(), metric.getNumFailedMutations().getValue());
             publishedMetricsForTable.put(metric.getNumOfIndexCommitFailedMutations().getMetricType(), metric.getNumOfIndexCommitFailedMutations().getValue());
@@ -110,6 +112,7 @@ public class MutationMetricQueue {
         private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
         private final CombinableMetric numFailedMutations = new CombinableMetricImpl(MUTATION_BATCH_FAILED_SIZE);
         private final CombinableMetric totalCommitTimeForUpserts = new CombinableMetricImpl(UPSERT_COMMIT_TIME);
+        private final CombinableMetric totalCommitTimeForAtomicUpserts = new CombinableMetricImpl(ATOMIC_UPSERT_COMMIT_TIME);
         private final CombinableMetric totalCommitTimeForDeletes = new CombinableMetricImpl(DELETE_COMMIT_TIME);
         private final CombinableMetric upsertMutationsSizeBytes = new CombinableMetricImpl(UPSERT_MUTATION_BYTES);
         private final CombinableMetric deleteMutationsSizeBytes = new CombinableMetricImpl(DELETE_MUTATION_BYTES);
@@ -124,17 +127,18 @@ public class MutationMetricQueue {
                 INDEX_COMMIT_FAILURE_SIZE);
 
         public static final MutationMetric EMPTY_METRIC =
-                new MutationMetric(0,0,0,0,0,0,0,0,0,0,0,0,0,0);
+                new MutationMetric(0,0,0,0, 0, 0,0,0,0,0,0,0,0,0,0);
 
         public MutationMetric(long numMutations, long upsertMutationsSizeBytes,
-                long deleteMutationsSizeBytes, long commitTimeForUpserts, long commitTimeForDeletes,
-                long numFailedMutations, long upsertMutationSqlCounterSuccess,
+                long deleteMutationsSizeBytes, long commitTimeForUpserts, long commitTimeForAtomicUpserts,
+                long commitTimeForDeletes, long numFailedMutations, long upsertMutationSqlCounterSuccess,
                 long deleteMutationSqlCounterSuccess, long totalMutationBytes,
                 long numOfPhase3Failed, long upsertBatchFailedSize,
                 long upsertBatchFailedCounter, long deleteBatchFailedSize,
                 long deleteBatchFailedCounter) {
             this.numMutations.change(numMutations);
             this.totalCommitTimeForUpserts.change(commitTimeForUpserts);
+            this.totalCommitTimeForAtomicUpserts.change(commitTimeForAtomicUpserts);
             this.totalCommitTimeForDeletes.change(commitTimeForDeletes);
             this.totalCommitTimeForMutations.change(commitTimeForUpserts + commitTimeForDeletes);
             this.numFailedMutations.change(numFailedMutations);
@@ -154,6 +158,8 @@ public class MutationMetricQueue {
             return totalCommitTimeForUpserts;
         }
 
+        public CombinableMetric getTotalCommitTimeForAtomicUpserts() { return totalCommitTimeForAtomicUpserts; }
+
         public CombinableMetric getTotalCommitTimeForDeletes() {
             return totalCommitTimeForDeletes;
         }
@@ -213,6 +219,7 @@ public class MutationMetricQueue {
         public void combineMetric(MutationMetric other) {
             this.numMutations.combine(other.numMutations);
             this.totalCommitTimeForUpserts.combine(other.totalCommitTimeForUpserts);
+            this.totalCommitTimeForAtomicUpserts.combine(other.totalCommitTimeForAtomicUpserts);
             this.totalCommitTimeForDeletes.combine(other.totalCommitTimeForDeletes);
             this.totalCommitTimeForMutations.combine(other.totalCommitTimeForMutations);
             this.numFailedMutations.combine(other.numFailedMutations);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
index b640f29..13ef856 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
@@ -68,6 +68,9 @@ import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_
 import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
 
 /**
  * This is used by TableMetricsManager class to store instance of
@@ -121,7 +124,10 @@ public class TableClientMetrics {
                 DELETE_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_DELETE_AGGREGATE_FAILURE_SQL_COUNTER(
                 DELETE_AGGREGATE_FAILURE_SQL_COUNTER), TABLE_SELECT_AGGREGATE_SUCCESS_SQL_COUNTER(
                 SELECT_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_SELECT_AGGREGATE_FAILURE_SQL_COUNTER(
-                SELECT_AGGREGATE_FAILURE_SQL_COUNTER);
+                SELECT_AGGREGATE_FAILURE_SQL_COUNTER),
+                TABLE_ATOMIC_UPSERT_SQL_COUNTER(ATOMIC_UPSERT_SQL_COUNTER),
+                TABLE_ATOMIC_UPSERT_COMMIT_TIME(ATOMIC_UPSERT_COMMIT_TIME),
+                TABLE_ATOMIC_UPSERT_SQL_QUERY_TIME(ATOMIC_UPSERT_SQL_QUERY_TIME);
 
         private final MetricType metricType;
         private PhoenixTableMetric metric;