You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/05/05 22:13:01 UTC
[kudu] 02/02: [client] KUDU-3365: Expose INSERT/UPDATE metrics in the Java API
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 03f266854a6ab7ff1732d380c722ba64c018ecc1
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Tue May 3 17:38:55 2022 -0700
[client] KUDU-3365: Expose INSERT/UPDATE metrics in the Java API
The work done in the scope of KUDU-3351 included the server-side changes
and corresponding changes in the Kudu C++ API to expose the metrics to
the client applications. This patch implement similar changes to expose
such metrics in Java client API.
Change-Id: I956eb0c0a2cadcf3491550630b861bb48462e8eb
Reviewed-on: http://gerrit.cloudera.org:8080/18489
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <al...@apache.org>
---
.../org/apache/kudu/client/AsyncKuduSession.java | 12 +++++++++
.../main/java/org/apache/kudu/client/Batch.java | 5 +++-
.../java/org/apache/kudu/client/BatchResponse.java | 17 +++++++++++-
.../java/org/apache/kudu/client/KuduSession.java | 5 ++++
.../java/org/apache/kudu/client/Operation.java | 5 +++-
.../org/apache/kudu/client/OperationResponse.java | 17 +++++++++++-
.../org/apache/kudu/client/ResourceMetrics.java | 28 +++++++++++++++++++-
.../apache/kudu/client/SessionConfiguration.java | 6 +++++
.../org/apache/kudu/client/TestKuduSession.java | 30 ++++++++++++++++++++++
9 files changed, 120 insertions(+), 5 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 07397330e..9c0ea6912 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -183,6 +183,11 @@ public class AsyncKuduSession implements SessionConfiguration {
private boolean ignoreAllDuplicateRows = false;
private boolean ignoreAllNotFoundRows = false;
+ /**
+ * Cumulative operation metrics since the beginning of the session.
+ */
+ private final ResourceMetrics writeOpMetrics = new ResourceMetrics();
+
/**
* Package-private constructor meant to be used via AsyncKuduClient
* @param client client that creates this session
@@ -317,6 +322,11 @@ public class AsyncKuduSession implements SessionConfiguration {
return errorCollector.getErrors();
}
+ @Override
+ public ResourceMetrics getWriteOpMetrics() {
+ return this.writeOpMetrics;
+ }
+
/**
* Flushes the buffered operations and marks this session as closed.
* See the javadoc on {@link #flush()} on how to deal with exceptions coming out of this method.
@@ -452,6 +462,7 @@ public class AsyncKuduSession implements SessionConfiguration {
// are visible should the callback interrogate the error collector.
operationResponse.getOperation().callback(operationResponse);
}
+ writeOpMetrics.update(response.getWriteOpMetrics());
return response;
}
@@ -624,6 +635,7 @@ public class AsyncKuduSession implements SessionConfiguration {
return client.sendRpcToTablet(operation)
.addCallbackDeferring(resp -> {
client.updateLastPropagatedTimestamp(resp.getWriteTimestampRaw());
+ writeOpMetrics.update(resp.getWriteOpMetrics());
return Deferred.fromResult(resp);
})
.addErrback(new SingleOperationErrCallback(operation));
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index 0c5c578b6..a248802f8 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -184,12 +184,15 @@ class Batch extends KuduRpc<BatchResponse> {
}
errorsPB = filteredErrors;
}
+ ResourceMetrics metrics = builder.hasResourceMetrics() ?
+ ResourceMetrics.fromResourceMetricsPB(builder.getResourceMetrics()) : null;
BatchResponse response = new BatchResponse(timeoutTracker.getElapsedMillis(),
tsUUID,
builder.getTimestamp(),
errorsPB,
operations,
- operationIndexes);
+ operationIndexes,
+ metrics);
if (injectedError != null) {
if (injectedlatencyMs > 0) {
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
index 24078eb4c..f11f930b3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
@@ -20,6 +20,7 @@ package org.apache.kudu.client;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import javax.annotation.Nullable;
import com.google.common.collect.ImmutableList;
import org.apache.yetus.audience.InterfaceAudience;
@@ -37,6 +38,7 @@ public class BatchResponse extends KuduRpcResponse {
private final List<RowError> rowErrors;
private final List<OperationResponse> individualResponses;
private final List<Integer> responsesIndexes;
+ private final ResourceMetrics writeOpMetrics;
/**
* Package-private constructor to be used by the RPCs.
@@ -45,13 +47,15 @@ public class BatchResponse extends KuduRpcResponse {
* @param errorsPB a list of row errors, can be empty
* @param operations the list of operations which created this response
* @param indexes the list of operations' order index
+ * @param writeOpMetrics the write operation metrics, can be null
*/
BatchResponse(long elapsedMillis,
String tsUUID,
long writeTimestamp,
List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB,
List<Operation> operations,
- List<Integer> indexes) {
+ List<Integer> indexes,
+ ResourceMetrics writeOpMetrics) {
super(elapsedMillis, tsUUID);
this.writeTimestamp = writeTimestamp;
individualResponses = new ArrayList<>(operations.size());
@@ -61,6 +65,7 @@ public class BatchResponse extends KuduRpcResponse {
} else {
rowErrors = new ArrayList<>(errorsPB.size());
}
+ this.writeOpMetrics = writeOpMetrics;
// Populate the list of individual row responses and the list of row errors. Not all the rows
// maybe have errors, but 'errorsPB' contains them in the same order as the operations that
@@ -95,6 +100,7 @@ public class BatchResponse extends KuduRpcResponse {
rowErrors = ImmutableList.of();
this.individualResponses = individualResponses;
this.responsesIndexes = indexes;
+ this.writeOpMetrics = null;
}
/**
@@ -122,4 +128,13 @@ public class BatchResponse extends KuduRpcResponse {
return responsesIndexes;
}
+ /**
+ * Return the write operation metrics associated with this batch.
+ * @return write operation metrics associated with this batch, or null if there is none.
+ */
+ @Nullable
+ ResourceMetrics getWriteOpMetrics() {
+ return this.writeOpMetrics;
+ }
+
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
index 8f0870ef7..b1845bd5f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
@@ -203,4 +203,9 @@ public class KuduSession implements SessionConfiguration {
public RowErrorsAndOverflowStatus getPendingErrors() {
return session.getPendingErrors();
}
+
+ @Override
+ public ResourceMetrics getWriteOpMetrics() {
+ return session.getWriteOpMetrics();
+ }
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 4d871c9ad..0ed9caca2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -223,11 +223,14 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
error = null;
}
}
+ Tserver.ResourceMetricsPB metricsPB = builder.hasResourceMetrics() ?
+ builder.getResourceMetrics() : null;
OperationResponse response = new OperationResponse(timeoutTracker.getElapsedMillis(),
tsUUID,
builder.getTimestamp(),
this,
- error);
+ error,
+ metricsPB);
return new Pair<>(response, builder.hasError() ? builder.getError() : null);
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
index d4234ac67..ca46387bf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
import java.util.ArrayList;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -32,6 +33,7 @@ public class OperationResponse extends KuduRpcResponse {
private final long writeTimestamp;
private final RowError rowError;
private final Operation operation;
+ private final ResourceMetrics writeOpMetrics;
/**
* Package-private constructor to build an OperationResponse with a row error in the pb format.
@@ -44,11 +46,14 @@ public class OperationResponse extends KuduRpcResponse {
String tsUUID,
long writeTimestamp,
Operation operation,
- Tserver.WriteResponsePB.PerRowErrorPB errorPB) {
+ Tserver.WriteResponsePB.PerRowErrorPB errorPB,
+ Tserver.ResourceMetricsPB metricsPB) {
super(elapsedMillis, tsUUID);
this.writeTimestamp = writeTimestamp;
this.rowError = errorPB == null ? null : RowError.fromRowErrorPb(errorPB, operation, tsUUID);
this.operation = operation;
+ this.writeOpMetrics = metricsPB == null ?
+ null : ResourceMetrics.fromResourceMetricsPB(metricsPB);
}
/**
@@ -67,6 +72,7 @@ public class OperationResponse extends KuduRpcResponse {
this.writeTimestamp = writeTimestamp;
this.rowError = rowError;
this.operation = operation;
+ this.writeOpMetrics = null;
}
/**
@@ -116,4 +122,13 @@ public class OperationResponse extends KuduRpcResponse {
Operation getOperation() {
return operation;
}
+
+ /**
+ * Return the write operation metrics associated with this batch.
+ * @return write operation metrics associated with this batch, or null if there is none.
+ */
+ @Nullable
+ ResourceMetrics getWriteOpMetrics() {
+ return this.writeOpMetrics;
+ }
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java
index 01ee22b93..1e7e7cb22 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java
@@ -35,7 +35,7 @@ import org.apache.kudu.tserver.Tserver.ResourceMetricsPB;
* A container for scanner resource metrics.
* <p>
* This class wraps a mapping from metric name to metric value for server-side
- * metrics associated with a scanner.
+ * metrics associated with a scanner and write operation.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@@ -75,6 +75,20 @@ public class ResourceMetrics {
}
}
+ /**
+ * Increment this instance's metric values with those found in 'resourceMetrics'.
+ * Noop if 'resourceMetrics' is null.
+ * @param resourceMetrics resource metrics protobuf object to be used to update this object.
+ * Can be null, which will not do anything.
+ */
+ void update(ResourceMetrics resourceMetrics) {
+ if (resourceMetrics != null) {
+ for (Map.Entry<String, LongAdder> entry : resourceMetrics.metrics.entrySet()) {
+ increment(entry.getKey(), entry.getValue().sum());
+ }
+ }
+ }
+
/**
* Increment the metric value by the specific amount.
* @param name the name of the metric whose value is to be incremented
@@ -83,4 +97,16 @@ public class ResourceMetrics {
private void increment(String name, long amount) {
metrics.computeIfAbsent(name, k -> new LongAdder()).add(amount);
}
+
+ /**
+ * Converts a ResourceMetricsPB into a ResourceMetrics.
+ * @param resourceMetricsPb a resource metrics in its PB format. Must not be null.
+ * @return a ResourceMetrics
+ */
+ static ResourceMetrics fromResourceMetricsPB(ResourceMetricsPB resourceMetricsPb) {
+ Preconditions.checkNotNull(resourceMetricsPb);
+ ResourceMetrics result = new ResourceMetrics();
+ result.update(resourceMetricsPb);
+ return result;
+ }
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
index d3e942d0a..dfb756652 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
@@ -207,4 +207,10 @@ public interface SessionConfiguration {
* @return an object that contains the errors and the overflow status
*/
RowErrorsAndOverflowStatus getPendingErrors();
+
+ /**
+ * Return cumulative write operation metrics since the beginning of the session.
+ * @return cumulative write operation metrics since the beginning of the session.
+ */
+ ResourceMetrics getWriteOpMetrics();
}
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
index 062fc6747..905ccbe74 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
@@ -344,6 +344,24 @@ public class TestKuduSession {
}
}
+ private void doVerifyMetrics(KuduSession session,
+ long successfulInserts,
+ long insertIgnoreErrors,
+ long successfulUpserts,
+ long successfulUpdates,
+ long updateIgnoreErrors,
+ long successfulDeletes,
+ long deleteIgnoreErrors) {
+ ResourceMetrics metrics = session.getWriteOpMetrics();
+ assertEquals(successfulInserts, metrics.getMetric("successful_inserts"));
+ assertEquals(insertIgnoreErrors, metrics.getMetric("insert_ignore_errors"));
+ assertEquals(successfulUpserts, metrics.getMetric("successful_upserts"));
+ assertEquals(successfulUpdates, metrics.getMetric("successful_updates"));
+ assertEquals(updateIgnoreErrors, metrics.getMetric("update_ignore_errors"));
+ assertEquals(successfulDeletes, metrics.getMetric("successful_deletes"));
+ assertEquals(deleteIgnoreErrors, metrics.getMetric("delete_ignore_errors"));
+ }
+
@Test(timeout = 10000)
public void testUpsert() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
@@ -358,6 +376,7 @@ public class TestKuduSession {
"INT32 key=1, INT32 column1_i=1, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
+ doVerifyMetrics(session, 0, 0, 1, 0, 0, 0, 0);
// Test an Upsert that acts as an Update.
assertFalse(session.apply(createUpsert(table, 1, 2, false)).hasRowError());
@@ -366,6 +385,7 @@ public class TestKuduSession {
"INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
+ doVerifyMetrics(session, 0, 0, 2, 0, 0, 0, 0);
}
@Test(timeout = 10000)
@@ -378,6 +398,7 @@ public class TestKuduSession {
session.apply(createUpsert(table, 1, 1, false));
session.apply(createInsertIgnore(table, 1));
List<OperationResponse> results = session.flush();
+ doVerifyMetrics(session, 1, 1, 1, 0, 0, 0, 0);
for (OperationResponse result : results) {
assertFalse(result.toString(), result.hasRowError());
}
@@ -398,6 +419,7 @@ public class TestKuduSession {
session.apply(createInsertIgnore(table, 1));
session.apply(createInsert(table, 1));
List<OperationResponse> results = session.flush();
+ doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 0);
assertFalse(results.get(0).toString(), results.get(0).hasRowError());
assertTrue(results.get(1).toString(), results.get(1).hasRowError());
assertTrue(results.get(1).getRowError().getErrorStatus().isAlreadyPresent());
@@ -421,6 +443,7 @@ public class TestKuduSession {
"INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
+ doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 0);
// Test insert ignore does not return a row error.
assertFalse(session.apply(createInsertIgnore(table, 1)).hasRowError());
@@ -429,6 +452,7 @@ public class TestKuduSession {
"INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
+ doVerifyMetrics(session, 1, 1, 0, 0, 0, 0, 0);
}
@@ -440,9 +464,11 @@ public class TestKuduSession {
// Test update ignore does not return a row error.
assertFalse(session.apply(createUpdateIgnore(table, 1, 1, false)).hasRowError());
assertEquals(0, scanTableToStrings(table).size());
+ doVerifyMetrics(session, 0, 0, 0, 0, 1, 0, 0);
assertFalse(session.apply(createInsert(table, 1)).hasRowError());
assertEquals(1, scanTableToStrings(table).size());
+ doVerifyMetrics(session, 1, 0, 0, 0, 1, 0, 0);
// Test update ignore implements normal update.
assertFalse(session.apply(createUpdateIgnore(table, 1, 2, false)).hasRowError());
@@ -452,6 +478,7 @@ public class TestKuduSession {
"INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
+ doVerifyMetrics(session, 1, 0, 0, 1, 1, 0, 0);
}
@Test(timeout = 10000)
@@ -461,13 +488,16 @@ public class TestKuduSession {
// Test delete ignore does not return a row error.
assertFalse(session.apply(createDeleteIgnore(table, 1)).hasRowError());
+ doVerifyMetrics(session, 0, 0, 0, 0, 0, 0, 1);
assertFalse(session.apply(createInsert(table, 1)).hasRowError());
assertEquals(1, scanTableToStrings(table).size());
+ doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 1);
// Test delete ignore implements normal delete.
assertFalse(session.apply(createDeleteIgnore(table, 1)).hasRowError());
assertEquals(0, scanTableToStrings(table).size());
+ doVerifyMetrics(session, 1, 0, 0, 0, 0, 1, 1);
}
@Test(timeout = 10000)