You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/23 23:24:46 UTC
[beam] branch master updated: [BEAM-12192] Have
WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2f3b188 [BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO
new 455dd10 Merge pull request #14580 from [BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO
2f3b188 is described below
commit 2f3b18888849369c06abd38f4b2feb73f1d6b2d8
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Mon Apr 19 15:06:05 2021 -0700
[BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO
---
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 28 +++++++++++---
.../sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java | 25 ++++++++----
.../io/kafka/WatchKafkaTopicPartitionDoFnTest.java | 45 ++++++++++++++++++----
3 files changed, 77 insertions(+), 21 deletions(-)
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 8b6058c..29d8bac 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -29,10 +29,12 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.sdk.annotations.Experimental;
@@ -581,9 +583,9 @@ public class KafkaIO {
extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
abstract Map<String, Object> getConsumerConfig();
- abstract List<String> getTopics();
+ abstract @Nullable List<String> getTopics();
- abstract List<TopicPartition> getTopicPartitions();
+ abstract @Nullable List<TopicPartition> getTopicPartitions();
abstract @Nullable Coder<K> getKeyCoder();
@@ -839,7 +841,8 @@ public class KafkaIO {
*/
public Read<K, V> withTopics(List<String> topics) {
checkState(
- getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both");
+ getTopicPartitions() == null || getTopicPartitions().isEmpty(),
+ "Only topics or topicPartitions can be set, not both");
return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
}
@@ -851,7 +854,9 @@ public class KafkaIO {
* partitions are distributed among the splits.
*/
public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
- checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both");
+ checkState(
+ getTopics() == null || getTopics().isEmpty(),
+ "Only topics or topicPartitions can be set, not both");
return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
}
@@ -1170,7 +1175,8 @@ public class KafkaIO {
// construction time. But it requires enabling beam_fn_api.
if (!isDynamicRead()) {
checkArgument(
- getTopics().size() > 0 || getTopicPartitions().size() > 0,
+ (getTopics() != null && getTopics().size() > 0)
+ || (getTopicPartitions() != null && getTopicPartitions().size() > 0),
"Either withTopic(), withTopics() or withTopicPartitions() is required");
} else {
checkArgument(
@@ -1327,6 +1333,15 @@ public class KafkaIO {
}
PCollection<KafkaSourceDescriptor> output;
if (kafkaRead.isDynamicRead()) {
+ Set<String> topics = new HashSet<>();
+ if (kafkaRead.getTopics() != null && kafkaRead.getTopics().size() > 0) {
+ topics.addAll(kafkaRead.getTopics());
+ }
+ if (kafkaRead.getTopicPartitions() != null && kafkaRead.getTopicPartitions().size() > 0) {
+ for (TopicPartition topicPartition : kafkaRead.getTopicPartitions()) {
+ topics.add(topicPartition.topic());
+ }
+ }
output =
input
.getPipeline()
@@ -1343,7 +1358,8 @@ public class KafkaIO {
kafkaRead.getConsumerFactoryFn(),
kafkaRead.getCheckStopReadingFn(),
kafkaRead.getConsumerConfig(),
- kafkaRead.getStartReadTime())));
+ kafkaRead.getStartReadTime(),
+ topics.stream().collect(Collectors.toList()))));
} else {
output =
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
index fc9cc62..f021e36 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
@@ -66,17 +66,21 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+ private final List<String> topics;
+
WatchKafkaTopicPartitionDoFn(
Duration checkDuration,
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
Map<String, Object> kafkaConsumerConfig,
- Instant startReadTime) {
+ Instant startReadTime,
+ List<String> topics) {
this.checkDuration = checkDuration == null ? DEFAULT_CHECK_DURATION : checkDuration;
this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
this.checkStopReadingFn = checkStopReadingFn;
this.kafkaConsumerConfig = kafkaConsumerConfig;
this.startReadTime = startReadTime;
+ this.topics = topics;
}
@TimerId(TIMER_ID)
@@ -89,13 +93,21 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
@VisibleForTesting
Set<TopicPartition> getAllTopicPartitions() {
Set<TopicPartition> current = new HashSet<>();
- // TODO(BEAM-12192): Respect given topics from KafkaIO.
try (Consumer<byte[], byte[]> kafkaConsumer =
kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) {
- for (Map.Entry<String, List<PartitionInfo>> topicInfo :
- kafkaConsumer.listTopics().entrySet()) {
- for (PartitionInfo partition : topicInfo.getValue()) {
- current.add(new TopicPartition(topicInfo.getKey(), partition.partition()));
+ if (topics != null && !topics.isEmpty()) {
+ for (String topic : topics) {
+ for (PartitionInfo partition : kafkaConsumer.partitionsFor(topic)) {
+ current.add(new TopicPartition(topic, partition.partition()));
+ }
+ }
+
+ } else {
+ for (Map.Entry<String, List<PartitionInfo>> topicInfo :
+ kafkaConsumer.listTopics().entrySet()) {
+ for (PartitionInfo partition : topicInfo.getValue()) {
+ current.add(new TopicPartition(topicInfo.getKey(), partition.partition()));
+ }
}
}
}
@@ -122,7 +134,6 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
});
timer.offset(checkDuration).setRelative();
- ;
}
@OnTimer(TIMER_ID)
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
index dc3d6b5..1fcc5de 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -80,7 +81,33 @@ public class WatchKafkaTopicPartitionDoFnTest {
new PartitionInfo("topic2", 1, null, null, null))));
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null);
+ Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, null);
+ assertEquals(
+ ImmutableSet.of(
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1),
+ new TopicPartition("topic2", 0),
+ new TopicPartition("topic2", 1)),
+ dofnInstance.getAllTopicPartitions());
+ }
+
+ @Test
+ public void testGetAllTopicPartitionsWithGivenTopics() throws Exception {
+ List<String> givenTopics = ImmutableList.of("topic1", "topic2");
+ when(mockConsumer.partitionsFor("topic1"))
+ .thenReturn(
+ ImmutableList.of(
+ new PartitionInfo("topic1", 0, null, null, null),
+ new PartitionInfo("topic1", 1, null, null, null)));
+ when(mockConsumer.partitionsFor("topic2"))
+ .thenReturn(
+ ImmutableList.of(
+ new PartitionInfo("topic2", 0, null, null, null),
+ new PartitionInfo("topic2", 1, null, null, null)));
+ WatchKafkaTopicPartitionDoFn dofnInstance =
+ new WatchKafkaTopicPartitionDoFn(
+ Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, givenTopics);
+ verify(mockConsumer, never()).listTopics();
assertEquals(
ImmutableSet.of(
new TopicPartition("topic1", 0),
@@ -94,7 +121,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
public void testProcessElementWhenNoAvailableTopicPartition() throws Exception {
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null);
+ Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
@@ -112,7 +139,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
Instant startReadTime = Instant.ofEpochMilli(1L);
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime);
+ Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics())
@@ -163,7 +190,8 @@ public class WatchKafkaTopicPartitionDoFnTest {
consumerFn,
checkStopReadingFn,
ImmutableMap.of(),
- startReadTime);
+ startReadTime,
+ null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics())
@@ -198,7 +226,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
public void testOnTimerWithNoAvailableTopicPartition() throws Exception {
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null);
+ Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
@@ -219,7 +247,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
Instant startReadTime = Instant.ofEpochMilli(1L);
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime);
+ Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics())
@@ -262,7 +290,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
Instant startReadTime = Instant.ofEpochMilli(1L);
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime);
+ Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics())
@@ -316,7 +344,8 @@ public class WatchKafkaTopicPartitionDoFnTest {
consumerFn,
checkStopReadingFn,
ImmutableMap.of(),
- startReadTime);
+ startReadTime,
+ null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics())