You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/23 23:24:46 UTC

[beam] branch master updated: [BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f3b188  [BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO
     new 455dd10  Merge pull request #14580 from [BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO
2f3b188 is described below

commit 2f3b18888849369c06abd38f4b2feb73f1d6b2d8
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Mon Apr 19 15:06:05 2021 -0700

    [BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 28 +++++++++++---
 .../sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java | 25 ++++++++----
 .../io/kafka/WatchKafkaTopicPartitionDoFnTest.java | 45 ++++++++++++++++++----
 3 files changed, 77 insertions(+), 21 deletions(-)

diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 8b6058c..29d8bac 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -29,10 +29,12 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -581,9 +583,9 @@ public class KafkaIO {
       extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
     abstract Map<String, Object> getConsumerConfig();
 
-    abstract List<String> getTopics();
+    abstract @Nullable List<String> getTopics();
 
-    abstract List<TopicPartition> getTopicPartitions();
+    abstract @Nullable List<TopicPartition> getTopicPartitions();
 
     abstract @Nullable Coder<K> getKeyCoder();
 
@@ -839,7 +841,8 @@ public class KafkaIO {
      */
     public Read<K, V> withTopics(List<String> topics) {
       checkState(
-          getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both");
+          getTopicPartitions() == null || getTopicPartitions().isEmpty(),
+          "Only topics or topicPartitions can be set, not both");
       return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
     }
 
@@ -851,7 +854,9 @@ public class KafkaIO {
      * partitions are distributed among the splits.
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
-      checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both");
+      checkState(
+          getTopics() == null || getTopics().isEmpty(),
+          "Only topics or topicPartitions can be set, not both");
       return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
@@ -1170,7 +1175,8 @@ public class KafkaIO {
       // construction time. But it requires enabling beam_fn_api.
       if (!isDynamicRead()) {
         checkArgument(
-            getTopics().size() > 0 || getTopicPartitions().size() > 0,
+            (getTopics() != null && getTopics().size() > 0)
+                || (getTopicPartitions() != null && getTopicPartitions().size() > 0),
             "Either withTopic(), withTopics() or withTopicPartitions() is required");
       } else {
         checkArgument(
@@ -1327,6 +1333,15 @@ public class KafkaIO {
         }
         PCollection<KafkaSourceDescriptor> output;
         if (kafkaRead.isDynamicRead()) {
+          Set<String> topics = new HashSet<>();
+          if (kafkaRead.getTopics() != null && kafkaRead.getTopics().size() > 0) {
+            topics.addAll(kafkaRead.getTopics());
+          }
+          if (kafkaRead.getTopicPartitions() != null && kafkaRead.getTopicPartitions().size() > 0) {
+            for (TopicPartition topicPartition : kafkaRead.getTopicPartitions()) {
+              topics.add(topicPartition.topic());
+            }
+          }
           output =
               input
                   .getPipeline()
@@ -1343,7 +1358,8 @@ public class KafkaIO {
                               kafkaRead.getConsumerFactoryFn(),
                               kafkaRead.getCheckStopReadingFn(),
                               kafkaRead.getConsumerConfig(),
-                              kafkaRead.getStartReadTime())));
+                              kafkaRead.getStartReadTime(),
+                              topics.stream().collect(Collectors.toList()))));
 
         } else {
           output =
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
index fc9cc62..f021e36 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
@@ -66,17 +66,21 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
 
   private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
 
+  private final List<String> topics;
+
   WatchKafkaTopicPartitionDoFn(
       Duration checkDuration,
       SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
       SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
       Map<String, Object> kafkaConsumerConfig,
-      Instant startReadTime) {
+      Instant startReadTime,
+      List<String> topics) {
     this.checkDuration = checkDuration == null ? DEFAULT_CHECK_DURATION : checkDuration;
     this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
     this.checkStopReadingFn = checkStopReadingFn;
     this.kafkaConsumerConfig = kafkaConsumerConfig;
     this.startReadTime = startReadTime;
+    this.topics = topics;
   }
 
   @TimerId(TIMER_ID)
@@ -89,13 +93,21 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
   @VisibleForTesting
   Set<TopicPartition> getAllTopicPartitions() {
     Set<TopicPartition> current = new HashSet<>();
-    // TODO(BEAM-12192): Respect given topics from KafkaIO.
     try (Consumer<byte[], byte[]> kafkaConsumer =
         kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) {
-      for (Map.Entry<String, List<PartitionInfo>> topicInfo :
-          kafkaConsumer.listTopics().entrySet()) {
-        for (PartitionInfo partition : topicInfo.getValue()) {
-          current.add(new TopicPartition(topicInfo.getKey(), partition.partition()));
+      if (topics != null && !topics.isEmpty()) {
+        for (String topic : topics) {
+          for (PartitionInfo partition : kafkaConsumer.partitionsFor(topic)) {
+            current.add(new TopicPartition(topic, partition.partition()));
+          }
+        }
+
+      } else {
+        for (Map.Entry<String, List<PartitionInfo>> topicInfo :
+            kafkaConsumer.listTopics().entrySet()) {
+          for (PartitionInfo partition : topicInfo.getValue()) {
+            current.add(new TopicPartition(topicInfo.getKey(), partition.partition()));
+          }
         }
       }
     }
@@ -122,7 +134,6 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
         });
 
     timer.offset(checkDuration).setRelative();
-    ;
   }
 
   @OnTimer(TIMER_ID)
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
index dc3d6b5..1fcc5de 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -80,7 +81,33 @@ public class WatchKafkaTopicPartitionDoFnTest {
                     new PartitionInfo("topic2", 1, null, null, null))));
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null);
+            Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, null);
+    assertEquals(
+        ImmutableSet.of(
+            new TopicPartition("topic1", 0),
+            new TopicPartition("topic1", 1),
+            new TopicPartition("topic2", 0),
+            new TopicPartition("topic2", 1)),
+        dofnInstance.getAllTopicPartitions());
+  }
+
+  @Test
+  public void testGetAllTopicPartitionsWithGivenTopics() throws Exception {
+    List<String> givenTopics = ImmutableList.of("topic1", "topic2");
+    when(mockConsumer.partitionsFor("topic1"))
+        .thenReturn(
+            ImmutableList.of(
+                new PartitionInfo("topic1", 0, null, null, null),
+                new PartitionInfo("topic1", 1, null, null, null)));
+    when(mockConsumer.partitionsFor("topic2"))
+        .thenReturn(
+            ImmutableList.of(
+                new PartitionInfo("topic2", 0, null, null, null),
+                new PartitionInfo("topic2", 1, null, null, null)));
+    WatchKafkaTopicPartitionDoFn dofnInstance =
+        new WatchKafkaTopicPartitionDoFn(
+            Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, givenTopics);
+    verify(mockConsumer, never()).listTopics();
     assertEquals(
         ImmutableSet.of(
             new TopicPartition("topic1", 0),
@@ -94,7 +121,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
   public void testProcessElementWhenNoAvailableTopicPartition() throws Exception {
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null);
+            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
@@ -112,7 +139,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
     Instant startReadTime = Instant.ofEpochMilli(1L);
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime);
+            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics())
@@ -163,7 +190,8 @@ public class WatchKafkaTopicPartitionDoFnTest {
             consumerFn,
             checkStopReadingFn,
             ImmutableMap.of(),
-            startReadTime);
+            startReadTime,
+            null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics())
@@ -198,7 +226,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
   public void testOnTimerWithNoAvailableTopicPartition() throws Exception {
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null);
+            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
@@ -219,7 +247,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
     Instant startReadTime = Instant.ofEpochMilli(1L);
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime);
+            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics())
@@ -262,7 +290,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
     Instant startReadTime = Instant.ofEpochMilli(1L);
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime);
+            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics())
@@ -316,7 +344,8 @@ public class WatchKafkaTopicPartitionDoFnTest {
             consumerFn,
             checkStopReadingFn,
             ImmutableMap.of(),
-            startReadTime);
+            startReadTime,
+            null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics())