You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2018/01/30 23:39:34 UTC
[beam] branch master updated: [BEAM-3525] Fix KafkaIO metric (#4524)
This is an automated email from the ASF dual-hosted git repository.
mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 34d8bc9 [BEAM-3525] Fix KafkaIO metric (#4524)
34d8bc9 is described below
commit 34d8bc9079a5a4b3d3a5476b2d4b48072be0dff4
Author: Raghu Angadi <ra...@apache.org>
AuthorDate: Tue Jan 30 15:39:29 2018 -0800
[BEAM-3525] Fix KafkaIO metric (#4524)
* Fix a KafkaIO metric
Couple of fixes :
'checkpointMarkCommits' was incremented outside a Beam API context (inside consumerPollLoop),
it is not supported. Instead, increment new metric 'enqueued'. 'enqueued' - 'skipped' gives
total number of actual commits.
Reverted an erlier PR that removed a test dependendency. It is required for tests in order
to see test output.
* Verify commits metric in unit test.
---
sdks/java/io/kafka/pom.xml | 6 ++++++
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 18 +++++++++---------
.../java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 14 ++++++++++++++
3 files changed, 29 insertions(+), 9 deletions(-)
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 38afa00..b04f5bf 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -127,5 +127,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 83d702c..996a460 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -911,9 +911,9 @@ public class KafkaIO {
@VisibleForTesting
static final String METRIC_NAMESPACE = "KafkaIOReader";
@VisibleForTesting
- static final String CHECKPOINT_MARK_COMMITS_METRIC = "checkpointMarkCommits";
- private static final String CHECKPOINT_MARK_COMMIT_SKIPS_METRIC = "checkpointMarkCommitSkips";
-
+ static final String CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC = "checkpointMarkCommitsEnqueued";
+ private static final String CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC =
+ "checkpointMarkCommitsSkipped";
private final UnboundedKafkaSource<K, V> source;
private final String name;
@@ -932,11 +932,11 @@ public class KafkaIO {
private final Counter bytesReadBySplit;
private final Gauge backlogBytesOfSplit;
private final Gauge backlogElementsOfSplit;
- private final Counter checkpointMarkCommits = Metrics.counter(
- METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_METRIC);
+ private final Counter checkpointMarkCommitsEnqueued = Metrics.counter(
+ METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC);
// Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed).
- private final Counter checkpointMarkCommitSkips = Metrics.counter(
- METRIC_NAMESPACE, CHECKPOINT_MARK_COMMIT_SKIPS_METRIC);
+ private final Counter checkpointMarkCommitsSkipped = Metrics.counter(
+ METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC);
/**
* The poll timeout while reading records from Kafka.
@@ -1130,7 +1130,6 @@ public class KafkaIO {
p -> new OffsetAndMetadata(p.getNextOffset())
))
);
- checkpointMarkCommits.inc();
}
/**
@@ -1142,8 +1141,9 @@ public class KafkaIO {
*/
void finalizeCheckpointMarkAsync(KafkaCheckpointMark checkpointMark) {
if (finalizedCheckpointMark.getAndSet(checkpointMark) != null) {
- checkpointMarkCommitSkips.inc();
+ checkpointMarkCommitsSkipped.inc();
}
+ checkpointMarkCommitsEnqueued.inc();
}
private void nextBatch() {
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 08b9d7c..08338d8 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -714,6 +715,19 @@ public class KafkaIOTest {
// since gauge values may be inconsistent in some environments assert only on their existence.
assertThat(backlogBytesMetrics.gauges(), IsIterableWithSize.iterableWithSize(1));
+
+ // Check checkpointMarkCommitsEnqueued metric.
+ MetricQueryResults commitsEnqueuedMetrics =
+ result.metrics().queryMetrics(
+ MetricsFilter.builder()
+ .addNameFilter(
+ MetricNameFilter.named(
+ KafkaIO.UnboundedKafkaReader.METRIC_NAMESPACE,
+ KafkaIO.UnboundedKafkaReader.CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC))
+ .build());
+
+ assertThat(commitsEnqueuedMetrics.counters(), IsIterableWithSize.iterableWithSize(1));
+ assertThat(commitsEnqueuedMetrics.counters().iterator().next().attempted(), greaterThan(0L));
}
@Test
--
To stop receiving notification emails like this one, please contact
mingmxu@apache.org.