You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/05/05 22:13:01 UTC

[kudu] 02/02: [client] KUDU-3365: Expose INSERT/UPDATE metrics in the Java API

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 03f266854a6ab7ff1732d380c722ba64c018ecc1
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Tue May 3 17:38:55 2022 -0700

    [client] KUDU-3365: Expose INSERT/UPDATE metrics in the Java API
    
    The work done in the scope of KUDU-3351 included the server-side changes
    and corresponding changes in the Kudu C++ API to expose the metrics to
    the client applications. This patch implement similar changes to expose
    such metrics in Java client API.
    
    Change-Id: I956eb0c0a2cadcf3491550630b861bb48462e8eb
    Reviewed-on: http://gerrit.cloudera.org:8080/18489
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <al...@apache.org>
---
 .../org/apache/kudu/client/AsyncKuduSession.java   | 12 +++++++++
 .../main/java/org/apache/kudu/client/Batch.java    |  5 +++-
 .../java/org/apache/kudu/client/BatchResponse.java | 17 +++++++++++-
 .../java/org/apache/kudu/client/KuduSession.java   |  5 ++++
 .../java/org/apache/kudu/client/Operation.java     |  5 +++-
 .../org/apache/kudu/client/OperationResponse.java  | 17 +++++++++++-
 .../org/apache/kudu/client/ResourceMetrics.java    | 28 +++++++++++++++++++-
 .../apache/kudu/client/SessionConfiguration.java   |  6 +++++
 .../org/apache/kudu/client/TestKuduSession.java    | 30 ++++++++++++++++++++++
 9 files changed, 120 insertions(+), 5 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 07397330e..9c0ea6912 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -183,6 +183,11 @@ public class AsyncKuduSession implements SessionConfiguration {
   private boolean ignoreAllDuplicateRows = false;
   private boolean ignoreAllNotFoundRows = false;
 
+  /**
+   * Cumulative operation metrics since the beginning of the session.
+   */
+  private final ResourceMetrics writeOpMetrics = new ResourceMetrics();
+
   /**
    * Package-private constructor meant to be used via AsyncKuduClient
    * @param client client that creates this session
@@ -317,6 +322,11 @@ public class AsyncKuduSession implements SessionConfiguration {
     return errorCollector.getErrors();
   }
 
+  @Override
+  public ResourceMetrics getWriteOpMetrics() {
+    return this.writeOpMetrics;
+  }
+
   /**
    * Flushes the buffered operations and marks this session as closed.
    * See the javadoc on {@link #flush()} on how to deal with exceptions coming out of this method.
@@ -452,6 +462,7 @@ public class AsyncKuduSession implements SessionConfiguration {
             // are visible should the callback interrogate the error collector.
             operationResponse.getOperation().callback(operationResponse);
           }
+          writeOpMetrics.update(response.getWriteOpMetrics());
 
           return response;
         }
@@ -624,6 +635,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     return client.sendRpcToTablet(operation)
         .addCallbackDeferring(resp -> {
           client.updateLastPropagatedTimestamp(resp.getWriteTimestampRaw());
+          writeOpMetrics.update(resp.getWriteOpMetrics());
           return Deferred.fromResult(resp);
         })
         .addErrback(new SingleOperationErrCallback(operation));
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index 0c5c578b6..a248802f8 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -184,12 +184,15 @@ class Batch extends KuduRpc<BatchResponse> {
       }
       errorsPB = filteredErrors;
     }
+    ResourceMetrics metrics = builder.hasResourceMetrics() ?
+        ResourceMetrics.fromResourceMetricsPB(builder.getResourceMetrics()) : null;
     BatchResponse response = new BatchResponse(timeoutTracker.getElapsedMillis(),
                                                tsUUID,
                                                builder.getTimestamp(),
                                                errorsPB,
                                                operations,
-                                               operationIndexes);
+                                               operationIndexes,
+                                               metrics);
 
     if (injectedError != null) {
       if (injectedlatencyMs > 0) {
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
index 24078eb4c..f11f930b3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
@@ -20,6 +20,7 @@ package org.apache.kudu.client;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -37,6 +38,7 @@ public class BatchResponse extends KuduRpcResponse {
   private final List<RowError> rowErrors;
   private final List<OperationResponse> individualResponses;
   private final List<Integer> responsesIndexes;
+  private final ResourceMetrics writeOpMetrics;
 
   /**
    * Package-private constructor to be used by the RPCs.
@@ -45,13 +47,15 @@ public class BatchResponse extends KuduRpcResponse {
    * @param errorsPB a list of row errors, can be empty
    * @param operations the list of operations which created this response
    * @param indexes the list of operations' order index
+   * @param writeOpMetrics the write operation metrics, can be null
    */
   BatchResponse(long elapsedMillis,
                 String tsUUID,
                 long writeTimestamp,
                 List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB,
                 List<Operation> operations,
-                List<Integer> indexes) {
+                List<Integer> indexes,
+                ResourceMetrics writeOpMetrics) {
     super(elapsedMillis, tsUUID);
     this.writeTimestamp = writeTimestamp;
     individualResponses = new ArrayList<>(operations.size());
@@ -61,6 +65,7 @@ public class BatchResponse extends KuduRpcResponse {
     } else {
       rowErrors = new ArrayList<>(errorsPB.size());
     }
+    this.writeOpMetrics = writeOpMetrics;
 
     // Populate the list of individual row responses and the list of row errors. Not all the rows
     // maybe have errors, but 'errorsPB' contains them in the same order as the operations that
@@ -95,6 +100,7 @@ public class BatchResponse extends KuduRpcResponse {
     rowErrors = ImmutableList.of();
     this.individualResponses = individualResponses;
     this.responsesIndexes = indexes;
+    this.writeOpMetrics = null;
   }
 
   /**
@@ -122,4 +128,13 @@ public class BatchResponse extends KuduRpcResponse {
     return responsesIndexes;
   }
 
+  /**
+   * Return the write operation metrics associated with this batch.
+   * @return write operation metrics associated with this batch, or null if there is none.
+   */
+  @Nullable
+  ResourceMetrics getWriteOpMetrics() {
+    return this.writeOpMetrics;
+  }
+
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
index 8f0870ef7..b1845bd5f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
@@ -203,4 +203,9 @@ public class KuduSession implements SessionConfiguration {
   public RowErrorsAndOverflowStatus getPendingErrors() {
     return session.getPendingErrors();
   }
+
+  @Override
+  public ResourceMetrics getWriteOpMetrics() {
+    return session.getWriteOpMetrics();
+  }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 4d871c9ad..0ed9caca2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -223,11 +223,14 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
         error = null;
       }
     }
+    Tserver.ResourceMetricsPB metricsPB = builder.hasResourceMetrics() ?
+            builder.getResourceMetrics() : null;
     OperationResponse response = new OperationResponse(timeoutTracker.getElapsedMillis(),
                                                        tsUUID,
                                                        builder.getTimestamp(),
                                                        this,
-                                                       error);
+                                                       error,
+                                                       metricsPB);
     return new Pair<>(response, builder.hasError() ? builder.getError() : null);
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
index d4234ac67..ca46387bf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 
 import java.util.ArrayList;
 import java.util.List;
+import javax.annotation.Nullable;
 
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -32,6 +33,7 @@ public class OperationResponse extends KuduRpcResponse {
   private final long writeTimestamp;
   private final RowError rowError;
   private final Operation operation;
+  private final ResourceMetrics writeOpMetrics;
 
   /**
    * Package-private constructor to build an OperationResponse with a row error in the pb format.
@@ -44,11 +46,14 @@ public class OperationResponse extends KuduRpcResponse {
                     String tsUUID,
                     long writeTimestamp,
                     Operation operation,
-                    Tserver.WriteResponsePB.PerRowErrorPB errorPB) {
+                    Tserver.WriteResponsePB.PerRowErrorPB errorPB,
+                    Tserver.ResourceMetricsPB metricsPB) {
     super(elapsedMillis, tsUUID);
     this.writeTimestamp = writeTimestamp;
     this.rowError = errorPB == null ? null : RowError.fromRowErrorPb(errorPB, operation, tsUUID);
     this.operation = operation;
+    this.writeOpMetrics = metricsPB == null ?
+            null : ResourceMetrics.fromResourceMetricsPB(metricsPB);
   }
 
   /**
@@ -67,6 +72,7 @@ public class OperationResponse extends KuduRpcResponse {
     this.writeTimestamp = writeTimestamp;
     this.rowError = rowError;
     this.operation = operation;
+    this.writeOpMetrics = null;
   }
 
   /**
@@ -116,4 +122,13 @@ public class OperationResponse extends KuduRpcResponse {
   Operation getOperation() {
     return operation;
   }
+
+  /**
+   * Return the write operation metrics associated with this batch.
+   * @return write operation metrics associated with this batch, or null if there is none.
+   */
+  @Nullable
+  ResourceMetrics getWriteOpMetrics() {
+    return this.writeOpMetrics;
+  }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java
index 01ee22b93..1e7e7cb22 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java
@@ -35,7 +35,7 @@ import org.apache.kudu.tserver.Tserver.ResourceMetricsPB;
  * A container for scanner resource metrics.
  * <p>
  * This class wraps a mapping from metric name to metric value for server-side
- * metrics associated with a scanner.
+ * metrics associated with a scanner and write operation.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -75,6 +75,20 @@ public class ResourceMetrics {
     }
   }
 
+  /**
+   * Increment this instance's metric values with those found in 'resourceMetrics'.
+   * Noop if 'resourceMetrics' is null.
+   * @param resourceMetrics resource metrics protobuf object to be used to update this object.
+   *                        Can be null, which will not do anything.
+   */
+  void update(ResourceMetrics resourceMetrics) {
+    if (resourceMetrics != null) {
+      for (Map.Entry<String, LongAdder> entry : resourceMetrics.metrics.entrySet()) {
+        increment(entry.getKey(), entry.getValue().sum());
+      }
+    }
+  }
+
   /**
    * Increment the metric value by the specific amount.
    * @param name the name of the metric whose value is to be incremented
@@ -83,4 +97,16 @@ public class ResourceMetrics {
   private void increment(String name, long amount) {
     metrics.computeIfAbsent(name, k -> new LongAdder()).add(amount);
   }
+
+  /**
+   * Converts a ResourceMetricsPB into a ResourceMetrics.
+   * @param resourceMetricsPb a resource metrics in its PB format. Must not be null.
+   * @return a ResourceMetrics
+   */
+  static ResourceMetrics fromResourceMetricsPB(ResourceMetricsPB resourceMetricsPb) {
+    Preconditions.checkNotNull(resourceMetricsPb);
+    ResourceMetrics result = new ResourceMetrics();
+    result.update(resourceMetricsPb);
+    return result;
+  }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
index d3e942d0a..dfb756652 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
@@ -207,4 +207,10 @@ public interface SessionConfiguration {
    * @return an object that contains the errors and the overflow status
    */
   RowErrorsAndOverflowStatus getPendingErrors();
+
+  /**
+   * Return cumulative write operation metrics since the beginning of the session.
+   * @return cumulative write operation metrics since the beginning of the session.
+   */
+  ResourceMetrics getWriteOpMetrics();
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
index 062fc6747..905ccbe74 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
@@ -344,6 +344,24 @@ public class TestKuduSession {
     }
   }
 
+  private void doVerifyMetrics(KuduSession session,
+                              long successfulInserts,
+                              long insertIgnoreErrors,
+                              long successfulUpserts,
+                              long successfulUpdates,
+                              long updateIgnoreErrors,
+                              long successfulDeletes,
+                              long deleteIgnoreErrors) {
+    ResourceMetrics metrics = session.getWriteOpMetrics();
+    assertEquals(successfulInserts, metrics.getMetric("successful_inserts"));
+    assertEquals(insertIgnoreErrors, metrics.getMetric("insert_ignore_errors"));
+    assertEquals(successfulUpserts, metrics.getMetric("successful_upserts"));
+    assertEquals(successfulUpdates, metrics.getMetric("successful_updates"));
+    assertEquals(updateIgnoreErrors, metrics.getMetric("update_ignore_errors"));
+    assertEquals(successfulDeletes, metrics.getMetric("successful_deletes"));
+    assertEquals(deleteIgnoreErrors, metrics.getMetric("delete_ignore_errors"));
+  }
+
   @Test(timeout = 10000)
   public void testUpsert() throws Exception {
     KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
@@ -358,6 +376,7 @@ public class TestKuduSession {
         "INT32 key=1, INT32 column1_i=1, INT32 column2_i=3, " +
             "STRING column3_s=a string, BOOL column4_b=true",
         rowStrings.get(0));
+    doVerifyMetrics(session, 0, 0, 1, 0, 0, 0, 0);
 
     // Test an Upsert that acts as an Update.
     assertFalse(session.apply(createUpsert(table, 1, 2, false)).hasRowError());
@@ -366,6 +385,7 @@ public class TestKuduSession {
         "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
             "STRING column3_s=a string, BOOL column4_b=true",
         rowStrings.get(0));
+    doVerifyMetrics(session, 0, 0, 2, 0, 0, 0, 0);
   }
 
   @Test(timeout = 10000)
@@ -378,6 +398,7 @@ public class TestKuduSession {
     session.apply(createUpsert(table, 1, 1, false));
     session.apply(createInsertIgnore(table, 1));
     List<OperationResponse> results = session.flush();
+    doVerifyMetrics(session, 1, 1, 1, 0, 0, 0, 0);
     for (OperationResponse result : results) {
       assertFalse(result.toString(), result.hasRowError());
     }
@@ -398,6 +419,7 @@ public class TestKuduSession {
     session.apply(createInsertIgnore(table, 1));
     session.apply(createInsert(table, 1));
     List<OperationResponse> results = session.flush();
+    doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 0);
     assertFalse(results.get(0).toString(), results.get(0).hasRowError());
     assertTrue(results.get(1).toString(), results.get(1).hasRowError());
     assertTrue(results.get(1).getRowError().getErrorStatus().isAlreadyPresent());
@@ -421,6 +443,7 @@ public class TestKuduSession {
             "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
                     "STRING column3_s=a string, BOOL column4_b=true",
             rowStrings.get(0));
+    doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 0);
 
     // Test insert ignore does not return a row error.
     assertFalse(session.apply(createInsertIgnore(table, 1)).hasRowError());
@@ -429,6 +452,7 @@ public class TestKuduSession {
             "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
                     "STRING column3_s=a string, BOOL column4_b=true",
             rowStrings.get(0));
+    doVerifyMetrics(session, 1, 1, 0, 0, 0, 0, 0);
 
   }
 
@@ -440,9 +464,11 @@ public class TestKuduSession {
     // Test update ignore does not return a row error.
     assertFalse(session.apply(createUpdateIgnore(table, 1, 1, false)).hasRowError());
     assertEquals(0, scanTableToStrings(table).size());
+    doVerifyMetrics(session, 0, 0, 0, 0, 1, 0, 0);
 
     assertFalse(session.apply(createInsert(table, 1)).hasRowError());
     assertEquals(1, scanTableToStrings(table).size());
+    doVerifyMetrics(session, 1, 0, 0, 0, 1, 0, 0);
 
     // Test update ignore implements normal update.
     assertFalse(session.apply(createUpdateIgnore(table, 1, 2, false)).hasRowError());
@@ -452,6 +478,7 @@ public class TestKuduSession {
         "INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
             "STRING column3_s=a string, BOOL column4_b=true",
         rowStrings.get(0));
+    doVerifyMetrics(session, 1, 0, 0, 1, 1, 0, 0);
   }
 
   @Test(timeout = 10000)
@@ -461,13 +488,16 @@ public class TestKuduSession {
 
     // Test delete ignore does not return a row error.
     assertFalse(session.apply(createDeleteIgnore(table, 1)).hasRowError());
+    doVerifyMetrics(session, 0, 0, 0, 0, 0, 0, 1);
 
     assertFalse(session.apply(createInsert(table, 1)).hasRowError());
     assertEquals(1, scanTableToStrings(table).size());
+    doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 1);
 
     // Test delete ignore implements normal delete.
     assertFalse(session.apply(createDeleteIgnore(table, 1)).hasRowError());
     assertEquals(0, scanTableToStrings(table).size());
+    doVerifyMetrics(session, 1, 0, 0, 0, 0, 1, 1);
   }
 
   @Test(timeout = 10000)