You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2018/01/30 23:39:34 UTC

[beam] branch master updated: [BEAM-3525] Fix KafkaIO metric (#4524)

This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 34d8bc9  [BEAM-3525] Fix KafkaIO metric (#4524)
34d8bc9 is described below

commit 34d8bc9079a5a4b3d3a5476b2d4b48072be0dff4
Author: Raghu Angadi <ra...@apache.org>
AuthorDate: Tue Jan 30 15:39:29 2018 -0800

    [BEAM-3525] Fix KafkaIO metric (#4524)
    
    * Fix a KafkaIO metric
    
    Couple of fixes :
    
    'checkpointMarkCommits' was incremented outside a Beam API context (inside consumerPollLoop),
    it is not supported. Instead, increment new metric 'enqueued'. 'enqueued' - 'skipped' gives
    total number of actual commits.
    
    Reverted an erlier PR that removed a test dependendency. It is required for tests in order
    to see test output.
    
    * Verify commits metric in unit test.
---
 sdks/java/io/kafka/pom.xml                             |  6 ++++++
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java     | 18 +++++++++---------
 .../java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 14 ++++++++++++++
 3 files changed, 29 insertions(+), 9 deletions(-)

diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 38afa00..b04f5bf 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -127,5 +127,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 83d702c..996a460 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -911,9 +911,9 @@ public class KafkaIO {
     @VisibleForTesting
     static final String METRIC_NAMESPACE = "KafkaIOReader";
     @VisibleForTesting
-    static final String CHECKPOINT_MARK_COMMITS_METRIC = "checkpointMarkCommits";
-    private static final String CHECKPOINT_MARK_COMMIT_SKIPS_METRIC = "checkpointMarkCommitSkips";
-
+    static final String CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC = "checkpointMarkCommitsEnqueued";
+    private static final String CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC =
+      "checkpointMarkCommitsSkipped";
 
     private final UnboundedKafkaSource<K, V> source;
     private final String name;
@@ -932,11 +932,11 @@ public class KafkaIO {
     private final Counter bytesReadBySplit;
     private final Gauge backlogBytesOfSplit;
     private final Gauge backlogElementsOfSplit;
-    private final Counter checkpointMarkCommits = Metrics.counter(
-      METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_METRIC);
+    private final Counter checkpointMarkCommitsEnqueued = Metrics.counter(
+      METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC);
     // Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed).
-    private final Counter checkpointMarkCommitSkips = Metrics.counter(
-      METRIC_NAMESPACE, CHECKPOINT_MARK_COMMIT_SKIPS_METRIC);
+    private final Counter checkpointMarkCommitsSkipped = Metrics.counter(
+      METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC);
 
     /**
      * The poll timeout while reading records from Kafka.
@@ -1130,7 +1130,6 @@ public class KafkaIO {
             p -> new OffsetAndMetadata(p.getNextOffset())
           ))
       );
-      checkpointMarkCommits.inc();
     }
 
     /**
@@ -1142,8 +1141,9 @@ public class KafkaIO {
      */
     void finalizeCheckpointMarkAsync(KafkaCheckpointMark checkpointMark) {
       if (finalizedCheckpointMark.getAndSet(checkpointMark) != null) {
-        checkpointMarkCommitSkips.inc();
+        checkpointMarkCommitsSkipped.inc();
       }
+      checkpointMarkCommitsEnqueued.inc();
     }
 
     private void nextBatch() {
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 08b9d7c..08338d8 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
 
 import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -714,6 +715,19 @@ public class KafkaIOTest {
 
     // since gauge values may be inconsistent in some environments assert only on their existence.
     assertThat(backlogBytesMetrics.gauges(), IsIterableWithSize.iterableWithSize(1));
+
+    // Check checkpointMarkCommitsEnqueued metric.
+    MetricQueryResults commitsEnqueuedMetrics =
+        result.metrics().queryMetrics(
+            MetricsFilter.builder()
+                .addNameFilter(
+                    MetricNameFilter.named(
+                        KafkaIO.UnboundedKafkaReader.METRIC_NAMESPACE,
+                        KafkaIO.UnboundedKafkaReader.CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC))
+                .build());
+
+    assertThat(commitsEnqueuedMetrics.counters(), IsIterableWithSize.iterableWithSize(1));
+    assertThat(commitsEnqueuedMetrics.counters().iterator().next().attempted(), greaterThan(0L));
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
mingmxu@apache.org.