You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/23 21:22:08 UTC

[beam] branch master updated: [BEAM-12193] Add user metrics to show founded TopicPartition

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fd434a  [BEAM-12193] Add user metrics to show founded TopicPartition
     new baa106e  Merge pull request #14570 from [BEAM-12193] Add user metrics to show founded TopicPartition
5fd434a is described below

commit 5fd434a4e18089148efa86e63573ee455d940be2
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Fri Apr 16 21:30:50 2021 -0700

    [BEAM-12193] Add user metrics to show founded TopicPartition
---
 .../apache/beam/sdk/io/kafka/TopicPartitionCoder.java  |  3 ++-
 .../sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java     | 16 ++++++++++++++--
 .../beam/sdk/io/kafka/TopicPartitionCoderTest.java     |  6 ++++++
 .../sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java | 18 ++++++------------
 4 files changed, 28 insertions(+), 15 deletions(-)

diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java
index f11e8ca..4868dc2 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.kafka.common.TopicPartition;
 
 /** The {@link Coder} for encoding and decoding {@link TopicPartition} in Beam. */
@@ -48,7 +49,7 @@ public class TopicPartitionCoder extends StructuredCoder<TopicPartition> {
 
   @Override
   public List<? extends Coder<?>> getCoderArguments() {
-    return null;
+    return ImmutableList.of(StringUtf8Coder.of(), VarIntCoder.of());
   }
 
   @Override
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
index d82bfcf..fc9cc62 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -62,6 +64,8 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
   private final Map<String, Object> kafkaConsumerConfig;
   private final Instant startReadTime;
 
+  private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+
   WatchKafkaTopicPartitionDoFn(
       Duration checkDuration,
       SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
@@ -85,6 +89,7 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
   @VisibleForTesting
   Set<TopicPartition> getAllTopicPartitions() {
     Set<TopicPartition> current = new HashSet<>();
+    // TODO(BEAM-12192): Respect given topics from KafkaIO.
     try (Consumer<byte[], byte[]> kafkaConsumer =
         kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) {
       for (Map.Entry<String, List<PartitionInfo>> topicInfo :
@@ -107,13 +112,17 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
     current.forEach(
         topicPartition -> {
           if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) {
+            Counter foundedTopicPartition =
+                Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
+            foundedTopicPartition.inc();
             existingTopicPartitions.add(topicPartition);
             outputReceiver.output(
                 KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
           }
         });
 
-    timer.set(Instant.now().plus(checkDuration.getMillis()));
+    timer.offset(checkDuration).setRelative();
+    ;
   }
 
   @OnTimer(TIMER_ID)
@@ -130,13 +139,16 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
             });
     existingTopicPartitions.clear();
 
-    Set<TopicPartition> currentAll = getAllTopicPartitions();
+    Set<TopicPartition> currentAll = this.getAllTopicPartitions();
 
     // Emit new added TopicPartitions.
     Set<TopicPartition> newAdded = Sets.difference(currentAll, readingTopicPartitions);
     newAdded.forEach(
         topicPartition -> {
           if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) {
+            Counter foundedTopicPartition =
+                Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
+            foundedTopicPartition.inc();
             outputReceiver.output(
                 KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
           }
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java
index 01c5acd..55cd957 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java
@@ -36,4 +36,10 @@ public class TopicPartitionCoderTest {
     assertEquals(
         topicPartition, coder.decode(new ByteArrayInputStream(outputStream.toByteArray())));
   }
+
+  @Test
+  public void testToString() throws Exception {
+    TopicPartitionCoder coder = new TopicPartitionCoder();
+    assertEquals("TopicPartitionCoder(StringUtf8Coder,VarIntCoder)", coder.toString());
+  }
 }
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
index 14460d6..dc3d6b5 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
@@ -99,12 +99,10 @@ public class WatchKafkaTopicPartitionDoFnTest {
 
     when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
     MockBagState bagState = new MockBagState(ImmutableList.of());
-    Instant now = Instant.EPOCH;
-    mockStatic(Instant.class);
-    when(Instant.now()).thenReturn(now);
 
+    when(timer.offset(Duration.millis(600L))).thenReturn(timer);
     dofnInstance.processElement(timer, bagState, outputReceiver);
-    verify(timer, times(1)).set(now.plus(600L));
+    verify(timer, times(1)).setRelative();
     assertTrue(outputReceiver.getOutputs().isEmpty());
     assertTrue(bagState.getCurrentStates().isEmpty());
   }
@@ -129,13 +127,11 @@ public class WatchKafkaTopicPartitionDoFnTest {
                     new PartitionInfo("topic2", 0, null, null, null),
                     new PartitionInfo("topic2", 1, null, null, null))));
     MockBagState bagState = new MockBagState(ImmutableList.of());
-    Instant now = Instant.EPOCH;
-    mockStatic(Instant.class);
-    when(Instant.now()).thenReturn(now);
 
+    when(timer.offset(Duration.millis(600L))).thenReturn(timer);
     dofnInstance.processElement(timer, bagState, outputReceiver);
 
-    verify(timer, times(1)).set(now.plus(600L));
+    verify(timer, times(1)).setRelative();
     Set<TopicPartition> expectedOutputTopicPartitions =
         ImmutableSet.of(
             new TopicPartition("topic1", 0),
@@ -182,13 +178,11 @@ public class WatchKafkaTopicPartitionDoFnTest {
                     new PartitionInfo("topic2", 0, null, null, null),
                     new PartitionInfo("topic2", 1, null, null, null))));
     MockBagState bagState = new MockBagState(ImmutableList.of());
-    Instant now = Instant.EPOCH;
-    mockStatic(Instant.class);
-    when(Instant.now()).thenReturn(now);
 
+    when(timer.offset(Duration.millis(600L))).thenReturn(timer);
     dofnInstance.processElement(timer, bagState, outputReceiver);
+    verify(timer, times(1)).setRelative();
 
-    verify(timer, times(1)).set(now.plus(600L));
     Set<TopicPartition> expectedOutputTopicPartitions =
         ImmutableSet.of(
             new TopicPartition("topic1", 0),