You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/23 21:22:08 UTC
[beam] branch master updated: [BEAM-12193] Add user metrics to show
founded TopicPartition
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5fd434a [BEAM-12193] Add user metrics to show founded TopicPartition
new baa106e Merge pull request #14570 from [BEAM-12193] Add user metrics to show founded TopicPartition
5fd434a is described below
commit 5fd434a4e18089148efa86e63573ee455d940be2
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Fri Apr 16 21:30:50 2021 -0700
[BEAM-12193] Add user metrics to show founded TopicPartition
---
.../apache/beam/sdk/io/kafka/TopicPartitionCoder.java | 3 ++-
.../sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java | 16 ++++++++++++++--
.../beam/sdk/io/kafka/TopicPartitionCoderTest.java | 6 ++++++
.../sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java | 18 ++++++------------
4 files changed, 28 insertions(+), 15 deletions(-)
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java
index f11e8ca..4868dc2 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.kafka.common.TopicPartition;
/** The {@link Coder} for encoding and decoding {@link TopicPartition} in Beam. */
@@ -48,7 +49,7 @@ public class TopicPartitionCoder extends StructuredCoder<TopicPartition> {
@Override
public List<? extends Coder<?>> getCoderArguments() {
- return null;
+ return ImmutableList.of(StringUtf8Coder.of(), VarIntCoder.of());
}
@Override
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
index d82bfcf..fc9cc62 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
@@ -62,6 +64,8 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
private final Map<String, Object> kafkaConsumerConfig;
private final Instant startReadTime;
+ private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+
WatchKafkaTopicPartitionDoFn(
Duration checkDuration,
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
@@ -85,6 +89,7 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
@VisibleForTesting
Set<TopicPartition> getAllTopicPartitions() {
Set<TopicPartition> current = new HashSet<>();
+ // TODO(BEAM-12192): Respect given topics from KafkaIO.
try (Consumer<byte[], byte[]> kafkaConsumer =
kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) {
for (Map.Entry<String, List<PartitionInfo>> topicInfo :
@@ -107,13 +112,17 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
current.forEach(
topicPartition -> {
if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) {
+ Counter foundedTopicPartition =
+ Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
+ foundedTopicPartition.inc();
existingTopicPartitions.add(topicPartition);
outputReceiver.output(
KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
}
});
- timer.set(Instant.now().plus(checkDuration.getMillis()));
+ timer.offset(checkDuration).setRelative();
+ ;
}
@OnTimer(TIMER_ID)
@@ -130,13 +139,16 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
});
existingTopicPartitions.clear();
- Set<TopicPartition> currentAll = getAllTopicPartitions();
+ Set<TopicPartition> currentAll = this.getAllTopicPartitions();
// Emit new added TopicPartitions.
Set<TopicPartition> newAdded = Sets.difference(currentAll, readingTopicPartitions);
newAdded.forEach(
topicPartition -> {
if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) {
+ Counter foundedTopicPartition =
+ Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
+ foundedTopicPartition.inc();
outputReceiver.output(
KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java
index 01c5acd..55cd957 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java
@@ -36,4 +36,10 @@ public class TopicPartitionCoderTest {
assertEquals(
topicPartition, coder.decode(new ByteArrayInputStream(outputStream.toByteArray())));
}
+
+ @Test
+ public void testToString() throws Exception {
+ TopicPartitionCoder coder = new TopicPartitionCoder();
+ assertEquals("TopicPartitionCoder(StringUtf8Coder,VarIntCoder)", coder.toString());
+ }
}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
index 14460d6..dc3d6b5 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
@@ -99,12 +99,10 @@ public class WatchKafkaTopicPartitionDoFnTest {
when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
MockBagState bagState = new MockBagState(ImmutableList.of());
- Instant now = Instant.EPOCH;
- mockStatic(Instant.class);
- when(Instant.now()).thenReturn(now);
+ when(timer.offset(Duration.millis(600L))).thenReturn(timer);
dofnInstance.processElement(timer, bagState, outputReceiver);
- verify(timer, times(1)).set(now.plus(600L));
+ verify(timer, times(1)).setRelative();
assertTrue(outputReceiver.getOutputs().isEmpty());
assertTrue(bagState.getCurrentStates().isEmpty());
}
@@ -129,13 +127,11 @@ public class WatchKafkaTopicPartitionDoFnTest {
new PartitionInfo("topic2", 0, null, null, null),
new PartitionInfo("topic2", 1, null, null, null))));
MockBagState bagState = new MockBagState(ImmutableList.of());
- Instant now = Instant.EPOCH;
- mockStatic(Instant.class);
- when(Instant.now()).thenReturn(now);
+ when(timer.offset(Duration.millis(600L))).thenReturn(timer);
dofnInstance.processElement(timer, bagState, outputReceiver);
- verify(timer, times(1)).set(now.plus(600L));
+ verify(timer, times(1)).setRelative();
Set<TopicPartition> expectedOutputTopicPartitions =
ImmutableSet.of(
new TopicPartition("topic1", 0),
@@ -182,13 +178,11 @@ public class WatchKafkaTopicPartitionDoFnTest {
new PartitionInfo("topic2", 0, null, null, null),
new PartitionInfo("topic2", 1, null, null, null))));
MockBagState bagState = new MockBagState(ImmutableList.of());
- Instant now = Instant.EPOCH;
- mockStatic(Instant.class);
- when(Instant.now()).thenReturn(now);
+ when(timer.offset(Duration.millis(600L))).thenReturn(timer);
dofnInstance.processElement(timer, bagState, outputReceiver);
+ verify(timer, times(1)).setRelative();
- verify(timer, times(1)).set(now.plus(600L));
Set<TopicPartition> expectedOutputTopicPartitions =
ImmutableSet.of(
new TopicPartition("topic1", 0),