You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2021/06/07 17:46:19 UTC
[phoenix] branch PHOENIX-6387-4.x updated: PHOENIX-6474 Client and
server metrics for atomic upserts (#1237)
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch PHOENIX-6387-4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-6387-4.x by this push:
new 50160a0 PHOENIX-6474 Client and server metrics for atomic upserts (#1237)
50160a0 is described below
commit 50160a096a0d7dff609bdcd50024a76b70f4f84c
Author: tkhurana <kh...@gmail.com>
AuthorDate: Mon Jun 7 10:46:08 2021 -0700
PHOENIX-6474 Client and server metrics for atomic upserts (#1237)
* PHOENIX-6474 Client and server metrics for atomic upserts
* Fixed failing tests related to metrics
---
.../phoenix/monitoring/BasePhoenixMetricsIT.java | 2 +-
.../phoenix/monitoring/PhoenixMetricsIT.java | 2 +-
.../monitoring/PhoenixTableLevelMetricsIT.java | 46 ++++++++++++++++++++++
.../org/apache/phoenix/execute/MutationState.java | 25 ++++++++++--
.../phoenix/hbase/index/IndexRegionObserver.java | 7 ++++
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 10 +++++
.../org/apache/phoenix/monitoring/MetricType.java | 6 +++
.../phoenix/monitoring/MutationMetricQueue.java | 13 ++++--
.../phoenix/monitoring/TableClientMetrics.java | 8 +++-
9 files changed, 109 insertions(+), 10 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
index 7c58945..45b3561 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
@@ -113,7 +113,7 @@ public class BasePhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
String t = entry.getKey();
assertEquals("Table names didn't match!", tableName, t);
Map<MetricType, Long> p = entry.getValue();
- assertEquals("There should have been fifteen metrics", 15, p.size());
+ assertEquals("There should have been sixteen metrics", 16, p.size());
boolean mutationBatchSizePresent = false;
boolean mutationCommitTimePresent = false;
boolean mutationBytesPresent = false;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index dc27dee..32d2f67 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -487,7 +487,7 @@ public class PhoenixMetricsIT extends BasePhoenixMetricsIT {
String t = entry.getKey();
assertEquals("Table names didn't match!", tableName, t);
Map<MetricType, Long> p = entry.getValue();
- assertEquals("There should have been five metrics", 15, p.size());
+ assertEquals("There should have been sixteen metrics", 16, p.size());
boolean mutationBatchSizePresent = false;
boolean mutationCommitTimePresent = false;
boolean mutationBytesPresent = false;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index b1215cd..2c1aaf9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -56,6 +56,8 @@ import org.junit.experimental.categories.Category;
import static org.apache.phoenix.exception.SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY;
import static org.apache.phoenix.exception.SQLExceptionCode.GET_TABLE_REGIONS_FAIL;
import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
@@ -1148,6 +1150,50 @@ public class PhoenixTableLevelMetricsIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ @Test public void testTableLevelMetricsForAtomicUpserts() throws Throwable {
+ String tableName = generateUniqueName();
+ Connection conn = null;
+ Throwable exception = null;
+ int numAtomicUpserts = 4;
+ try {
+ conn = getConnFromTestDriver();
+ String ddl = "create table " + tableName + "(pk varchar primary key, counter1 bigint)";
+ conn.createStatement().execute(ddl);
+ String dml;
+ ResultSet rs;
+ dml = String.format("UPSERT INTO %s VALUES('a', 0)", tableName);
+ conn.createStatement().execute(dml);
+ dml = String.format("UPSERT INTO %s VALUES('a', 0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1", tableName);
+ for (int i = 0; i < numAtomicUpserts; ++i) {
+ conn.createStatement().execute(dml);
+ }
+ conn.commit();
+ String dql = String.format("SELECT counter1 FROM %s WHERE counter1 > 0", tableName);
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ }catch (Throwable t) {
+ exception = t;
+ } finally {
+ // Otherwise the test fails with an error from assertions below instead of the real exception
+ if (exception != null) {
+ throw exception;
+ }
+ assertNotNull("Failed to get a connection!", conn);
+ // Get write metrics before closing the connection since that clears those metrics
+ Map<MetricType, Long>
+ writeMutMetrics =
+ getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
+ conn.close();
+ // 1 regular upsert + numAtomicUpserts
+ // 2 mutations (regular and atomic on the same row in the same batch will be split)
+ assertMutationTableMetrics(true, tableName, 1 + numAtomicUpserts, 0, 0, true, 2, 0, 0, 2, 0,
+ writeMutMetrics, conn);
+ assertEquals(numAtomicUpserts, getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_SQL_COUNTER));
+ assertTrue(getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_COMMIT_TIME) > 0);
+ }
+ }
+
private Connection getConnFromTestDriver() throws SQLException {
Connection conn = DriverManager.getConnection(url);
assertTrue(conn.unwrap(PhoenixConnection.class)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1efbcf0..68c0d02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -955,6 +955,7 @@ public class MutationState implements SQLCloseable {
long tempSize;
long deleteSize = 0, deleteCounter = 0;
long upsertsize = 0, upsertCounter = 0;
+ long atomicUpsertsize = 0;
if (GlobalClientMetrics.isMetricsEnabled()) {
for (Mutation mutation : mutations) {
tempSize = KeyValueUtil.calculateMutationDiskSize(mutation);
@@ -966,6 +967,9 @@ public class MutationState implements SQLCloseable {
}else if(mutation instanceof Put) {
upsertsize += tempSize;
upsertCounter++;
+ if (mutation.getAttribute(PhoenixIndexBuilder.ATOMIC_OP_ATTRIB) != null) {
+ atomicUpsertsize += tempSize;
+ }
allDeletesMutations = false;
} else {
allUpsertsMutations = false;
@@ -976,7 +980,7 @@ public class MutationState implements SQLCloseable {
if (updateGlobalClientMetrics) {
GLOBAL_MUTATION_BYTES.update(byteSize);
}
- return new MutationBytes(deleteCounter, deleteSize, byteSize, upsertCounter, upsertsize);
+ return new MutationBytes(deleteCounter, deleteSize, byteSize, upsertCounter, upsertsize, atomicUpsertsize);
}
public long getBatchSizeBytes() {
@@ -994,14 +998,16 @@ public class MutationState implements SQLCloseable {
private long totalMutationBytes;
private long upsertMutationCounter;
private long upsertMutationBytes;
+ private long atomicUpsertMutationBytes; // needed to calculate atomic upsert commit time
- public MutationBytes(long deleteMutationCounter, long deleteMutationBytes, long totalMutationBytes, long
- upsertMutationCounter, long upsertMutationBytes) {
+ public MutationBytes(long deleteMutationCounter, long deleteMutationBytes, long totalMutationBytes,
+ long upsertMutationCounter, long upsertMutationBytes, long atomicUpsertMutationBytes) {
this.deleteMutationCounter = deleteMutationCounter;
this.deleteMutationBytes = deleteMutationBytes;
this.totalMutationBytes = totalMutationBytes;
this.upsertMutationCounter = upsertMutationCounter;
this.upsertMutationBytes = upsertMutationBytes;
+ this.atomicUpsertMutationBytes = atomicUpsertMutationBytes;
}
@@ -1024,6 +1030,8 @@ public class MutationState implements SQLCloseable {
public long getUpsertMutationBytes() {
return upsertMutationBytes;
}
+
+ public long getAtomicUpsertMutationBytes() { return atomicUpsertMutationBytes; }
}
public enum MutationMetadataType {
@@ -1542,7 +1550,7 @@ public class MutationState implements SQLCloseable {
// in case we are dealing with all deletes for a non-transactional table, since there is a
// bug in sendMutations where we don't get the correct value for numFailedMutations when
// we don't use transactions
- return new MutationMetricQueue.MutationMetric(0, 0, 0, 0, 0,
+ return new MutationMetricQueue.MutationMetric(0, 0, 0, 0, 0, 0,
allDeletesMutations && !isTransactional ? numDeleteMutationsInBatch : numFailedMutations,
0, 0, 0, 0,
numUpsertMutationsInBatch,
@@ -1571,6 +1579,8 @@ public class MutationState implements SQLCloseable {
long numFailedPhase3Mutations, long mutationCommitTime) {
long committedUpsertMutationBytes = totalMutationBytesObject == null ? 0 :
totalMutationBytesObject.getUpsertMutationBytes();
+ long committedAtomicUpsertMutationBytes = totalMutationBytesObject == null ? 0:
+ totalMutationBytesObject.getAtomicUpsertMutationBytes();
long committedDeleteMutationBytes = totalMutationBytesObject == null ? 0 :
totalMutationBytesObject.getDeleteMutationBytes();
long committedUpsertMutationCounter = totalMutationBytesObject == null ? 0 :
@@ -1580,6 +1590,7 @@ public class MutationState implements SQLCloseable {
long committedTotalMutationBytes = totalMutationBytesObject == null ? 0 :
totalMutationBytesObject.getTotalMutationBytes();
long upsertMutationCommitTime = 0L;
+ long atomicUpsertMutationCommitTime = 0L;
long deleteMutationCommitTime = 0L;
if (totalMutationBytesObject != null && numFailedMutations != 0) {
@@ -1592,6 +1603,8 @@ public class MutationState implements SQLCloseable {
calculateMutationSize(uncommittedMutationsList, false);
committedUpsertMutationBytes -=
uncommittedMutationBytesObject.getUpsertMutationBytes();
+ committedAtomicUpsertMutationBytes -=
+ uncommittedMutationBytesObject.getAtomicUpsertMutationBytes();
committedDeleteMutationBytes -=
uncommittedMutationBytesObject.getDeleteMutationBytes();
committedUpsertMutationCounter -=
@@ -1606,6 +1619,9 @@ public class MutationState implements SQLCloseable {
upsertMutationCommitTime =
(long)Math.floor((double)(committedUpsertMutationBytes * mutationCommitTime)/
committedTotalMutationBytes);
+ atomicUpsertMutationCommitTime =
+ (long)Math.floor((double)(committedAtomicUpsertMutationBytes * mutationCommitTime)/
+ committedTotalMutationBytes);
deleteMutationCommitTime =
(long)Math.ceil((double)(committedDeleteMutationBytes * mutationCommitTime)/
committedTotalMutationBytes);
@@ -1614,6 +1630,7 @@ public class MutationState implements SQLCloseable {
committedUpsertMutationBytes,
committedDeleteMutationBytes,
upsertMutationCommitTime,
+ atomicUpsertMutationCommitTime,
deleteMutationCommitTime,
0, // num failed mutations have been counted already in updateMutationBatchFailureMetrics()
committedUpsertMutationCounter,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index a70b39a..96b4f18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -1101,19 +1101,26 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
lockRows(context);
boolean hasAtomic = hasAtomicUpdate(miniBatchOp);
+ long onDupCheckTime = 0;
+
if (hasAtomic || hasGlobalIndex(indexMetaData)) {
// Retrieve the current row states from the data table while holding the lock.
// This is needed for both atomic mutations and global indexes
+ long start = EnvironmentEdgeManager.currentTimeMillis();
getCurrentRowStates(c, context);
+ onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() - start);
}
if (hasAtomic) {
+ long start = EnvironmentEdgeManager.currentTimeMillis();
// add the mutations for conditional updates to the mini batch
addOnDupMutationsToBatch(miniBatchOp, context);
// release locks for ON DUPLICATE KEY IGNORE since we won't be changing those rows
// this is needed so that we can exit early
releaseLocksForOnDupIgnoreMutations(miniBatchOp, context);
+ onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() - start);
+ metricSource.updateDuplicateKeyCheckTime(dataTableName, onDupCheckTime);
// early exit if we are not changing any rows
if (context.rowsToLock.isEmpty()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 9d7d5f0..2a76887 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.jdbc;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_FAILED_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_COUNTER;
@@ -511,6 +513,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
boolean success = false;
String tableName = null;
boolean isUpsert = false;
+ boolean isAtomicUpsert = false;
boolean isDelete = false;
MutationState state = null;
MutationPlan plan = null;
@@ -525,6 +528,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
isUpsert = stmt instanceof ExecutableUpsertStatement;
isDelete = stmt instanceof ExecutableDeleteStatement;
+ isAtomicUpsert = isUpsert && ((ExecutableUpsertStatement)stmt).getOnDupKeyPairs() != null;
if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null) {
if(!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString())) {
tableName = plan.getTargetRef().getTable().getPhysicalName().toString();
@@ -596,6 +600,12 @@ public class PhoenixStatement implements Statement, SQLCloseable {
UPSERT_SQL_COUNTER : DELETE_SQL_COUNTER, 1);
TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
UPSERT_SQL_QUERY_TIME : DELETE_SQL_QUERY_TIME, executeMutationTimeSpent);
+ if (isAtomicUpsert) {
+ TableMetricsManager.updateMetricsMethod(tableName,
+ ATOMIC_UPSERT_SQL_COUNTER, 1);
+ TableMetricsManager.updateMetricsMethod(tableName,
+ ATOMIC_UPSERT_SQL_QUERY_TIME, executeMutationTimeSpent);
+ }
if (success) {
TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index dced4ca..ed7d003 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -56,6 +56,12 @@ public enum MetricType {
+ " autoCommit is true, the total time taken for executeMutation + conn.commit",
LogLevel.OFF, PLong.INSTANCE),
+ ATOMIC_UPSERT_SQL_COUNTER("auc", "Counter for number of atomic upsert sql queries", LogLevel.OFF, PLong.INSTANCE),
+ ATOMIC_UPSERT_COMMIT_TIME("aut", "Time it took to commit a batch of atomic upserts", LogLevel.OFF, PLong.INSTANCE),
+ ATOMIC_UPSERT_SQL_QUERY_TIME("auqt", "Time taken by atomic upsert sql queries inside executeMutation or if"
+ + " autoCommit is true, the total time taken for executeMutation + conn.commit",
+ LogLevel.OFF, PLong.INSTANCE),
+
// delete-specific metrics updated during executeMutation
DELETE_SQL_COUNTER("dc", "Counter for number of delete sql queries", LogLevel.OFF, PLong.INSTANCE),
DELETE_SUCCESS_SQL_COUNTER("dssc", "Counter for number of delete sql queries that successfully"
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index b8444c3..a44483a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.monitoring;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_SIZE;
import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
@@ -82,6 +83,7 @@ public class MutationMetricQueue {
publishedMetricsForTable.put(metric.getDeleteMutationsSizeBytes().getMetricType(), metric.getDeleteMutationsSizeBytes().getValue());
publishedMetricsForTable.put(metric.getCommitTimeForMutations().getMetricType(), metric.getCommitTimeForMutations().getValue());
publishedMetricsForTable.put(metric.getTotalCommitTimeForUpserts().getMetricType(), metric.getTotalCommitTimeForUpserts().getValue());
+ publishedMetricsForTable.put(metric.getTotalCommitTimeForAtomicUpserts().getMetricType(), metric.getTotalCommitTimeForAtomicUpserts().getValue());
publishedMetricsForTable.put(metric.getTotalCommitTimeForDeletes().getMetricType(), metric.getTotalCommitTimeForDeletes().getValue());
publishedMetricsForTable.put(metric.getNumFailedMutations().getMetricType(), metric.getNumFailedMutations().getValue());
publishedMetricsForTable.put(metric.getNumOfIndexCommitFailedMutations().getMetricType(), metric.getNumOfIndexCommitFailedMutations().getValue());
@@ -110,6 +112,7 @@ public class MutationMetricQueue {
private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
private final CombinableMetric numFailedMutations = new CombinableMetricImpl(MUTATION_BATCH_FAILED_SIZE);
private final CombinableMetric totalCommitTimeForUpserts = new CombinableMetricImpl(UPSERT_COMMIT_TIME);
+ private final CombinableMetric totalCommitTimeForAtomicUpserts = new CombinableMetricImpl(ATOMIC_UPSERT_COMMIT_TIME);
private final CombinableMetric totalCommitTimeForDeletes = new CombinableMetricImpl(DELETE_COMMIT_TIME);
private final CombinableMetric upsertMutationsSizeBytes = new CombinableMetricImpl(UPSERT_MUTATION_BYTES);
private final CombinableMetric deleteMutationsSizeBytes = new CombinableMetricImpl(DELETE_MUTATION_BYTES);
@@ -124,17 +127,18 @@ public class MutationMetricQueue {
INDEX_COMMIT_FAILURE_SIZE);
public static final MutationMetric EMPTY_METRIC =
- new MutationMetric(0,0,0,0,0,0,0,0,0,0,0,0,0,0);
+ new MutationMetric(0,0,0,0, 0, 0,0,0,0,0,0,0,0,0,0);
public MutationMetric(long numMutations, long upsertMutationsSizeBytes,
- long deleteMutationsSizeBytes, long commitTimeForUpserts, long commitTimeForDeletes,
- long numFailedMutations, long upsertMutationSqlCounterSuccess,
+ long deleteMutationsSizeBytes, long commitTimeForUpserts, long commitTimeForAtomicUpserts,
+ long commitTimeForDeletes, long numFailedMutations, long upsertMutationSqlCounterSuccess,
long deleteMutationSqlCounterSuccess, long totalMutationBytes,
long numOfPhase3Failed, long upsertBatchFailedSize,
long upsertBatchFailedCounter, long deleteBatchFailedSize,
long deleteBatchFailedCounter) {
this.numMutations.change(numMutations);
this.totalCommitTimeForUpserts.change(commitTimeForUpserts);
+ this.totalCommitTimeForAtomicUpserts.change(commitTimeForAtomicUpserts);
this.totalCommitTimeForDeletes.change(commitTimeForDeletes);
this.totalCommitTimeForMutations.change(commitTimeForUpserts + commitTimeForDeletes);
this.numFailedMutations.change(numFailedMutations);
@@ -154,6 +158,8 @@ public class MutationMetricQueue {
return totalCommitTimeForUpserts;
}
+ public CombinableMetric getTotalCommitTimeForAtomicUpserts() { return totalCommitTimeForAtomicUpserts; }
+
public CombinableMetric getTotalCommitTimeForDeletes() {
return totalCommitTimeForDeletes;
}
@@ -213,6 +219,7 @@ public class MutationMetricQueue {
public void combineMetric(MutationMetric other) {
this.numMutations.combine(other.numMutations);
this.totalCommitTimeForUpserts.combine(other.totalCommitTimeForUpserts);
+ this.totalCommitTimeForAtomicUpserts.combine(other.totalCommitTimeForAtomicUpserts);
this.totalCommitTimeForDeletes.combine(other.totalCommitTimeForDeletes);
this.totalCommitTimeForMutations.combine(other.totalCommitTimeForMutations);
this.numFailedMutations.combine(other.numFailedMutations);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
index b640f29..13ef856 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
@@ -68,6 +68,9 @@ import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
/**
* This is used by TableMetricsManager class to store instance of
@@ -121,7 +124,10 @@ public class TableClientMetrics {
DELETE_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_DELETE_AGGREGATE_FAILURE_SQL_COUNTER(
DELETE_AGGREGATE_FAILURE_SQL_COUNTER), TABLE_SELECT_AGGREGATE_SUCCESS_SQL_COUNTER(
SELECT_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_SELECT_AGGREGATE_FAILURE_SQL_COUNTER(
- SELECT_AGGREGATE_FAILURE_SQL_COUNTER);
+ SELECT_AGGREGATE_FAILURE_SQL_COUNTER),
+ TABLE_ATOMIC_UPSERT_SQL_COUNTER(ATOMIC_UPSERT_SQL_COUNTER),
+ TABLE_ATOMIC_UPSERT_COMMIT_TIME(ATOMIC_UPSERT_COMMIT_TIME),
+ TABLE_ATOMIC_UPSERT_SQL_QUERY_TIME(ATOMIC_UPSERT_SQL_QUERY_TIME);
private final MetricType metricType;
private PhoenixTableMetric metric;