You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2021/12/09 20:48:48 UTC
[beam] branch master updated: [BEAM-13171] Support for stopReadTime on KafkaIO SDF (#15951)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 163ac6a [BEAM-13171] Support for stopReadTime on KafkaIO SDF (#15951)
163ac6a is described below
commit 163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2
Author: Mostafa Aghajani <ag...@live.com>
AuthorDate: Thu Dec 9 22:47:53 2021 +0200
[BEAM-13171] Support for stopReadTime on KafkaIO SDF (#15951)
* + Support for stopReadTime on ReadFromKafkaDoFn (SDF)
* + Tests for initial restriction when stop offset or stop read time are present
* Update CHANGE.md
* Update CHANGES.md
Move the change under I/O
* + Fix KafkaIO doc typos
+ Check and prevent stopReadTime use on Unbounded implementation
+ Check for mutual exclusive usage of startOffset/startReadTime and stopOffset/stopReadTime
+ Remove unnecessary endOffset check (tryClaim already guarantee that)
+ Formatting fixes
* + Remove confusing endOffset checks from ReadFromKafkaDoFn
+ Move the release note to 2.36.0
---
CHANGES.md | 32 +++++++++
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 48 ++++++++++++-
.../beam/sdk/io/kafka/KafkaSourceDescriptor.java | 36 +++++++++-
.../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 20 +++++-
.../sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java | 9 ++-
.../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 83 +++++++++++++++++-----
.../io/kafka/WatchKafkaTopicPartitionDoFnTest.java | 20 +++---
7 files changed, 216 insertions(+), 32 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 6c6061a..8e46d8a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -51,6 +51,38 @@
* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-->
+# [2.36.0] - Unreleased
+
+## Highlights
+
+* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
+
+## I/Os
+
+* Support for stopReadTime on KafkaIO SDF (Java).([BEAM-13171](https://issues.apache.org/jira/browse/BEAM-13171)).
+
+## New Features / Improvements
+
+* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Remote packages can now be downloaded from locations supported by apache_beam.io.filesystems. The files will be downloaded on Stager and uploaded to staging location. For more information, see [BEAM-11275](https://issues.apache.org/jira/browse/BEAM-11275)
+
+## Breaking Changes
+
+* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Deprecations
+
+* X behavior is deprecated and will be removed in X versions ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Bugfixes
+
+* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Known Issues
+
+* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
# [2.35.0] - Unreleased
## Highlights
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index cbfe4c2..8164daf 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -628,6 +628,8 @@ public class KafkaIO {
abstract @Nullable Instant getStartReadTime();
+ abstract @Nullable Instant getStopReadTime();
+
abstract boolean isCommitOffsetsInFinalizeEnabled();
abstract boolean isDynamicRead();
@@ -670,6 +672,8 @@ public class KafkaIO {
abstract Builder<K, V> setStartReadTime(Instant startReadTime);
+ abstract Builder<K, V> setStopReadTime(Instant stopReadTime);
+
abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize);
abstract Builder<K, V> setDynamicRead(boolean dynamicRead);
@@ -742,6 +746,10 @@ public class KafkaIO {
builder.setStartReadTime(Instant.ofEpochMilli(config.startReadTime));
}
+ if (config.stopReadTime != null) {
+ builder.setStopReadTime(Instant.ofEpochMilli(config.stopReadTime));
+ }
+
// We can expose dynamic read to external build when ReadFromKafkaDoFn is the default
// implementation.
builder.setDynamicRead(false);
@@ -803,6 +811,7 @@ public class KafkaIO {
private String keyDeserializer;
private String valueDeserializer;
private Long startReadTime;
+ private Long stopReadTime;
private Long maxNumRecords;
private Long maxReadTime;
private Boolean commitOffsetInFinalize;
@@ -828,6 +837,10 @@ public class KafkaIO {
this.startReadTime = startReadTime;
}
+ public void setStopReadTime(Long stopReadTime) {
+ this.stopReadTime = stopReadTime;
+ }
+
public void setMaxNumRecords(Long maxNumRecords) {
this.maxNumRecords = maxNumRecords;
}
@@ -981,7 +994,7 @@ public class KafkaIO {
* <p>Note that this take priority over start offset configuration {@code
* ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} and any auto committed offsets.
*
- * <p>This results in hard failures in either of the following two cases : 1. If one of more
+ * <p>This results in hard failures in either of the following two cases : 1. If one or more
* partitions do not contain any messages with timestamp larger than or equal to desired
* timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the
* messages do not have timestamps.
@@ -991,6 +1004,19 @@ public class KafkaIO {
}
/**
+ * Use timestamp to set up stop offset. It is only supported by Kafka Client 0.10.1.0 onwards
+ * and the message format version after 0.10.0.
+ *
+ * <p>This results in hard failures in either of the following two cases : 1. If one or more
+ * partitions do not contain any messages with timestamp larger than or equal to desired
+ * timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the
+ * messages do not have timestamps.
+ */
+ public Read<K, V> withStopReadTime(Instant stopReadTime) {
+ return toBuilder().setStopReadTime(stopReadTime).build();
+ }
+
+ /**
* Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}. Mainly
* used for tests and demo applications.
*/
@@ -1235,6 +1261,15 @@ public class KafkaIO {
+ ". If you are building with maven, set \"kafka.clients.version\" "
+ "maven property to 0.10.1.0 or newer.");
}
+ if (getStopReadTime() != null) {
+ checkArgument(
+ ConsumerSpEL.hasOffsetsForTimes(),
+ "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, "
+ + "current version of Kafka Client is "
+ + AppInfoParser.getVersion()
+ + ". If you are building with maven, set \"kafka.clients.version\" "
+ + "maven property to 0.10.1.0 or newer.");
+ }
if (isCommitOffsetsInFinalizeEnabled()) {
checkArgument(
getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null,
@@ -1263,6 +1298,9 @@ public class KafkaIO {
|| getMaxNumRecords() < Long.MAX_VALUE
|| getMaxReadTime() != null
|| runnerRequiresLegacyRead(input.getPipeline().getOptions())) {
+ checkArgument(
+ getStopReadTime() == null,
+ "stopReadTime is set but it is only supported via SDF implementation.");
return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
}
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
@@ -1408,6 +1446,7 @@ public class KafkaIO {
kafkaRead.getCheckStopReadingFn(),
kafkaRead.getConsumerConfig(),
kafkaRead.getStartReadTime(),
+ kafkaRead.getStopReadTime(),
topics.stream().collect(Collectors.toList()))));
} else {
@@ -1433,6 +1472,7 @@ public class KafkaIO {
this.topics = read.getTopics();
this.topicPartitions = read.getTopicPartitions();
this.startReadTime = read.getStartReadTime();
+ this.stopReadTime = read.getStopReadTime();
}
private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
@@ -1442,6 +1482,8 @@ public class KafkaIO {
private final Instant startReadTime;
+ private final Instant stopReadTime;
+
@VisibleForTesting final Map<String, Object> consumerConfig;
@VisibleForTesting final List<String> topics;
@@ -1459,7 +1501,9 @@ public class KafkaIO {
}
}
for (TopicPartition topicPartition : partitions) {
- receiver.output(KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
+ receiver.output(
+ KafkaSourceDescriptor.of(
+ topicPartition, null, startReadTime, null, stopReadTime, null));
}
}
}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java
index a11fe96..cde7a3b 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.List;
@@ -50,6 +52,14 @@ public abstract class KafkaSourceDescriptor implements Serializable {
@Nullable
abstract Instant getStartReadTime();
+ @SchemaFieldName("stop_read_offset")
+ @Nullable
+ abstract Long getStopReadOffset();
+
+ @SchemaFieldName("stop_read_time")
+ @Nullable
+ abstract Instant getStopReadTime();
+
@SchemaFieldName("bootstrap_servers")
@Nullable
abstract List<String> getBootStrapServers();
@@ -68,15 +78,30 @@ public abstract class KafkaSourceDescriptor implements Serializable {
TopicPartition topicPartition,
Long startReadOffset,
Instant startReadTime,
+ Long stopReadOffset,
+ Instant stopReadTime,
List<String> bootstrapServers) {
+ checkArguments(startReadOffset, startReadTime, stopReadOffset, stopReadTime);
return new AutoValue_KafkaSourceDescriptor(
topicPartition.topic(),
topicPartition.partition(),
startReadOffset,
startReadTime,
+ stopReadOffset,
+ stopReadTime,
bootstrapServers);
}
+ private static void checkArguments(
+ Long startReadOffset, Instant startReadTime, Long stopReadOffset, Instant stopReadTime) {
+ checkArgument(
+ startReadOffset == null || startReadTime == null,
+ "startReadOffset and startReadTime are optional but mutually exclusive. Please set only one of them.");
+ checkArgument(
+ stopReadOffset == null || stopReadTime == null,
+ "stopReadOffset and stopReadTime are optional but mutually exclusive. Please set only one of them.");
+ }
+
@SchemaCreate
@SuppressWarnings("all")
// TODO(BEAM-10677): Remove this function after AutoValueSchema is fixed.
@@ -85,8 +110,17 @@ public abstract class KafkaSourceDescriptor implements Serializable {
Integer partition,
Long start_read_offset,
Instant start_read_time,
+ Long stop_read_offset,
+ Instant stop_read_time,
List<String> bootstrap_servers) {
+ checkArguments(start_read_offset, start_read_time, stop_read_offset, stop_read_time);
return new AutoValue_KafkaSourceDescriptor(
- topic, partition, start_read_offset, start_read_time, bootstrap_servers);
+ topic,
+ partition,
+ start_read_offset,
+ start_read_time,
+ stop_read_offset,
+ stop_read_time,
+ bootstrap_servers);
}
}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 6c603ad..311a033 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -245,7 +245,19 @@ class ReadFromKafkaDoFn<K, V>
} else {
startOffset = offsetConsumer.position(kafkaSourceDescriptor.getTopicPartition());
}
- return new OffsetRange(startOffset, Long.MAX_VALUE);
+
+ long endOffset = Long.MAX_VALUE;
+ if (kafkaSourceDescriptor.getStopReadOffset() != null) {
+ endOffset = kafkaSourceDescriptor.getStopReadOffset();
+ } else if (kafkaSourceDescriptor.getStopReadTime() != null) {
+ endOffset =
+ ConsumerSpEL.offsetForTime(
+ offsetConsumer,
+ kafkaSourceDescriptor.getTopicPartition(),
+ kafkaSourceDescriptor.getStopReadTime());
+ }
+
+ return new OffsetRange(startOffset, endOffset);
}
}
@@ -274,8 +286,11 @@ class ReadFromKafkaDoFn<K, V>
}
@NewTracker
- public GrowableOffsetRangeTracker restrictionTracker(
+ public OffsetRangeTracker restrictionTracker(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) {
+ if (restriction.getTo() < Long.MAX_VALUE) {
+ return new OffsetRangeTracker(restriction);
+ }
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
KafkaLatestOffsetEstimator offsetPoller =
@@ -328,6 +343,7 @@ class ReadFromKafkaDoFn<K, V>
ConsumerSpEL.evaluateAssign(
consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
long startOffset = tracker.currentRestriction().getFrom();
+
long expectedOffset = startOffset;
consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
index c4e6272..afe6c66 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
@@ -66,6 +66,7 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
private final SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
private final Map<String, Object> kafkaConsumerConfig;
private final Instant startReadTime;
+ private final Instant stopReadTime;
private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
@@ -77,12 +78,14 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
Map<String, Object> kafkaConsumerConfig,
Instant startReadTime,
+ Instant stopReadTime,
List<String> topics) {
this.checkDuration = checkDuration == null ? DEFAULT_CHECK_DURATION : checkDuration;
this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
this.checkStopReadingFn = checkStopReadingFn;
this.kafkaConsumerConfig = kafkaConsumerConfig;
this.startReadTime = startReadTime;
+ this.stopReadTime = stopReadTime;
this.topics = topics;
}
@@ -132,7 +135,8 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
foundedTopicPartition.inc();
existingTopicPartitions.add(topicPartition);
outputReceiver.output(
- KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
+ KafkaSourceDescriptor.of(
+ topicPartition, null, startReadTime, null, stopReadTime, null));
}
});
@@ -164,7 +168,8 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
foundedTopicPartition.inc();
outputReceiver.output(
- KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
+ KafkaSourceDescriptor.of(
+ topicPartition, null, startReadTime, null, stopReadTime, null));
}
});
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index b836993..0b4150c 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
@@ -88,7 +89,8 @@ public class ReadFromKafkaDoFnTest {
private boolean isRemoved = false;
private long currentPos = 0L;
private long startOffset = 0L;
- private long startOffsetForTime = 0L;
+ private KV<Long, Instant> startOffsetForTime = KV.of(0L, Instant.now());
+ private KV<Long, Instant> stopOffsetForTime = KV.of(Long.MAX_VALUE, null);
private long numOfRecordsPerPoll;
public SimpleMockKafkaConsumer(
@@ -101,7 +103,8 @@ public class ReadFromKafkaDoFnTest {
this.isRemoved = false;
this.currentPos = 0L;
this.startOffset = 0L;
- this.startOffsetForTime = 0L;
+ this.startOffsetForTime = KV.of(0L, Instant.now());
+ this.stopOffsetForTime = KV.of(Long.MAX_VALUE, null);
this.numOfRecordsPerPoll = 0L;
}
@@ -117,8 +120,12 @@ public class ReadFromKafkaDoFnTest {
this.currentPos = pos;
}
- public void setStartOffsetForTime(long pos) {
- this.startOffsetForTime = pos;
+ public void setStartOffsetForTime(long offset, Instant time) {
+ this.startOffsetForTime = KV.of(offset, time);
+ }
+
+ public void setStopOffsetForTime(long offset, Instant time) {
+ this.stopOffsetForTime = KV.of(offset, time);
}
@Override
@@ -174,10 +181,14 @@ public class ReadFromKafkaDoFnTest {
Iterables.getOnlyElement(
timestampsToSearch.keySet().stream().collect(Collectors.toList()))
.equals(this.topicPartition));
- return ImmutableMap.of(
- topicPartition,
- new OffsetAndTimestamp(
- this.startOffsetForTime, Iterables.getOnlyElement(timestampsToSearch.values())));
+ Long timeToSearch = Iterables.getOnlyElement(timestampsToSearch.values());
+ Long returnOffset = 0L;
+ if (timeToSearch == this.startOffsetForTime.getValue().getMillis()) {
+ returnOffset = this.startOffsetForTime.getKey();
+ } else if (timeToSearch == this.stopOffsetForTime.getValue().getMillis()) {
+ returnOffset = this.stopOffsetForTime.getKey();
+ }
+ return ImmutableMap.of(topicPartition, new OffsetAndTimestamp(returnOffset, timeToSearch));
}
@Override
@@ -240,33 +251,70 @@ public class ReadFromKafkaDoFnTest {
@Test
public void testInitialRestrictionWhenHasStartOffset() throws Exception {
long expectedStartOffset = 10L;
- consumer.setStartOffsetForTime(15L);
+ consumer.setStartOffsetForTime(15L, Instant.now());
consumer.setCurrentPos(5L);
OffsetRange result =
dofnInstance.initialRestriction(
KafkaSourceDescriptor.of(
- topicPartition, expectedStartOffset, Instant.now(), ImmutableList.of()));
+ topicPartition, expectedStartOffset, null, null, null, ImmutableList.of()));
assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result);
}
@Test
+ public void testInitialRestrictionWhenHasStopOffset() throws Exception {
+ long expectedStartOffset = 10L;
+ long expectedStopOffset = 20L;
+ consumer.setStartOffsetForTime(15L, Instant.now());
+ consumer.setStopOffsetForTime(18L, Instant.now());
+ consumer.setCurrentPos(5L);
+ OffsetRange result =
+ dofnInstance.initialRestriction(
+ KafkaSourceDescriptor.of(
+ topicPartition,
+ expectedStartOffset,
+ null,
+ expectedStopOffset,
+ null,
+ ImmutableList.of()));
+ assertEquals(new OffsetRange(expectedStartOffset, expectedStopOffset), result);
+ }
+
+ @Test
public void testInitialRestrictionWhenHasStartTime() throws Exception {
long expectedStartOffset = 10L;
- consumer.setStartOffsetForTime(expectedStartOffset);
+ Instant startReadTime = Instant.now();
+ consumer.setStartOffsetForTime(expectedStartOffset, startReadTime);
consumer.setCurrentPos(5L);
OffsetRange result =
dofnInstance.initialRestriction(
- KafkaSourceDescriptor.of(topicPartition, null, Instant.now(), ImmutableList.of()));
+ KafkaSourceDescriptor.of(
+ topicPartition, null, startReadTime, null, null, ImmutableList.of()));
assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result);
}
@Test
+ public void testInitialRestrictionWhenHasStopTime() throws Exception {
+ long expectedStartOffset = 10L;
+ Instant startReadTime = Instant.now();
+ long expectedStopOffset = 100L;
+ Instant stopReadTime = startReadTime.plus(Duration.millis(2000));
+ consumer.setStartOffsetForTime(expectedStartOffset, startReadTime);
+ consumer.setStopOffsetForTime(expectedStopOffset, stopReadTime);
+ consumer.setCurrentPos(5L);
+ OffsetRange result =
+ dofnInstance.initialRestriction(
+ KafkaSourceDescriptor.of(
+ topicPartition, null, startReadTime, null, stopReadTime, ImmutableList.of()));
+ assertEquals(new OffsetRange(expectedStartOffset, expectedStopOffset), result);
+ }
+
+ @Test
public void testInitialRestrictionWithConsumerPosition() throws Exception {
long expectedStartOffset = 5L;
consumer.setCurrentPos(5L);
OffsetRange result =
dofnInstance.initialRestriction(
- KafkaSourceDescriptor.of(topicPartition, null, null, ImmutableList.of()));
+ KafkaSourceDescriptor.of(topicPartition, null, null, null, null, ImmutableList.of()));
assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result);
}
@@ -277,7 +325,8 @@ public class ReadFromKafkaDoFnTest {
long startOffset = 5L;
OffsetRangeTracker tracker =
new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3));
- KafkaSourceDescriptor descriptor = KafkaSourceDescriptor.of(topicPartition, null, null, null);
+ KafkaSourceDescriptor descriptor =
+ KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
ProcessContinuation result =
dofnInstance.processElement(descriptor, tracker, null, (OutputReceiver) receiver);
assertEquals(ProcessContinuation.stop(), result);
@@ -292,7 +341,7 @@ public class ReadFromKafkaDoFnTest {
OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
ProcessContinuation result =
dofnInstance.processElement(
- KafkaSourceDescriptor.of(topicPartition, null, null, null),
+ KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null),
tracker,
null,
(OutputReceiver) receiver);
@@ -308,7 +357,7 @@ public class ReadFromKafkaDoFnTest {
OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
ProcessContinuation result =
dofnInstance.processElement(
- KafkaSourceDescriptor.of(topicPartition, null, null, null),
+ KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null),
tracker,
null,
(OutputReceiver) receiver);
@@ -335,7 +384,7 @@ public class ReadFromKafkaDoFnTest {
OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
ProcessContinuation result =
instance.processElement(
- KafkaSourceDescriptor.of(topicPartition, null, null, null),
+ KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null),
tracker,
null,
(OutputReceiver) receiver);
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
index eeabf40..0708e6f 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
@@ -82,7 +82,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
new PartitionInfo("topic2", 1, null, null, null))));
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, null);
+ Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, null, null);
assertEquals(
ImmutableSet.of(
new TopicPartition("topic1", 0),
@@ -107,7 +107,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
new PartitionInfo("topic2", 1, null, null, null)));
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, givenTopics);
+ Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, null, givenTopics);
verify(mockConsumer, never()).listTopics();
assertEquals(
ImmutableSet.of(
@@ -122,7 +122,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
public void testProcessElementWhenNoAvailableTopicPartition() throws Exception {
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null);
+ Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null, null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
@@ -140,7 +140,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
Instant startReadTime = Instant.ofEpochMilli(1L);
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
+ Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null, null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics())
@@ -192,6 +192,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
checkStopReadingFn,
ImmutableMap.of(),
startReadTime,
+ null,
null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
@@ -227,7 +228,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
public void testOnTimerWithNoAvailableTopicPartition() throws Exception {
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null);
+ Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null, null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
@@ -248,7 +249,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
Instant startReadTime = Instant.ofEpochMilli(1L);
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
+ Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null, null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics())
@@ -291,7 +292,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
Instant startReadTime = Instant.ofEpochMilli(1L);
WatchKafkaTopicPartitionDoFn dofnInstance =
new WatchKafkaTopicPartitionDoFn(
- Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
+ Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null, null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
when(mockConsumer.listTopics())
@@ -346,6 +347,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
checkStopReadingFn,
ImmutableMap.of(),
startReadTime,
+ null,
null);
MockOutputReceiver outputReceiver = new MockOutputReceiver();
@@ -440,7 +442,9 @@ public class WatchKafkaTopicPartitionDoFnTest {
private Set<KafkaSourceDescriptor> generateDescriptorsFromTopicPartitions(
Set<TopicPartition> topicPartitions, Instant startReadTime) {
return topicPartitions.stream()
- .map(topicPartition -> KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null))
+ .map(
+ topicPartition ->
+ KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null, null, null))
.collect(Collectors.toSet());
}
}