You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2021/12/09 20:48:48 UTC

[beam] branch master updated: [BEAM-13171] Support for stopReadTime on KafkaIO SDF (#15951)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 163ac6a  [BEAM-13171] Support for stopReadTime on KafkaIO SDF (#15951)
163ac6a is described below

commit 163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2
Author: Mostafa Aghajani <ag...@live.com>
AuthorDate: Thu Dec 9 22:47:53 2021 +0200

    [BEAM-13171] Support for stopReadTime on KafkaIO SDF (#15951)
    
    * + Support for stopReadTime on ReadFromKafkaDoFn (SDF)
    
    * + Tests for initial restriction when stop offset or stop read time are present
    
    * Update CHANGE.md
    
    * Update CHANGES.md
    
    Move the change under I/O
    
    * + Fix KafkaIO doc typos
    + Check and prevent stopReadTime use on Unbounded implementation
    + Check for mutual exclusive usage of startOffset/startReadTime and stopOffset/stopReadTime
    + Remove unnecessary endOffset check (tryClaim already guarantee that)
    + Formatting fixes
    
    * + Remove confusing endOffset checks from ReadFromKafkaDoFn
    + Move the release note to 2.36.0
---
 CHANGES.md                                         | 32 +++++++++
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 48 ++++++++++++-
 .../beam/sdk/io/kafka/KafkaSourceDescriptor.java   | 36 +++++++++-
 .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java       | 20 +++++-
 .../sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java |  9 ++-
 .../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java   | 83 +++++++++++++++++-----
 .../io/kafka/WatchKafkaTopicPartitionDoFnTest.java | 20 +++---
 7 files changed, 216 insertions(+), 32 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 6c6061a..8e46d8a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -51,6 +51,38 @@
 
 * ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 -->
+# [2.36.0] - Unreleased
+
+## Highlights
+
+* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
+
+## I/Os
+
+* Support for stopReadTime on KafkaIO SDF (Java).([BEAM-13171](https://issues.apache.org/jira/browse/BEAM-13171)).
+
+## New Features / Improvements
+
+* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Remote packages can now be downloaded from locations supported by apache_beam.io.filesystems. The files will be downloaded on Stager and uploaded to staging location. For more information, see [BEAM-11275](https://issues.apache.org/jira/browse/BEAM-11275)
+
+## Breaking Changes
+
+* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Deprecations
+
+* X behavior is deprecated and will be removed in X versions ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Bugfixes
+
+* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Known Issues
+
+* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
 # [2.35.0] - Unreleased
 
 ## Highlights
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index cbfe4c2..8164daf 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -628,6 +628,8 @@ public class KafkaIO {
 
     abstract @Nullable Instant getStartReadTime();
 
+    abstract @Nullable Instant getStopReadTime();
+
     abstract boolean isCommitOffsetsInFinalizeEnabled();
 
     abstract boolean isDynamicRead();
@@ -670,6 +672,8 @@ public class KafkaIO {
 
       abstract Builder<K, V> setStartReadTime(Instant startReadTime);
 
+      abstract Builder<K, V> setStopReadTime(Instant stopReadTime);
+
       abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize);
 
       abstract Builder<K, V> setDynamicRead(boolean dynamicRead);
@@ -742,6 +746,10 @@ public class KafkaIO {
           builder.setStartReadTime(Instant.ofEpochMilli(config.startReadTime));
         }
 
+        if (config.stopReadTime != null) {
+          builder.setStopReadTime(Instant.ofEpochMilli(config.stopReadTime));
+        }
+
         // We can expose dynamic read to external build when ReadFromKafkaDoFn is the default
         // implementation.
         builder.setDynamicRead(false);
@@ -803,6 +811,7 @@ public class KafkaIO {
         private String keyDeserializer;
         private String valueDeserializer;
         private Long startReadTime;
+        private Long stopReadTime;
         private Long maxNumRecords;
         private Long maxReadTime;
         private Boolean commitOffsetInFinalize;
@@ -828,6 +837,10 @@ public class KafkaIO {
           this.startReadTime = startReadTime;
         }
 
+        public void setStopReadTime(Long stopReadTime) {
+          this.stopReadTime = stopReadTime;
+        }
+
         public void setMaxNumRecords(Long maxNumRecords) {
           this.maxNumRecords = maxNumRecords;
         }
@@ -981,7 +994,7 @@ public class KafkaIO {
      * <p>Note that this take priority over start offset configuration {@code
      * ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} and any auto committed offsets.
      *
-     * <p>This results in hard failures in either of the following two cases : 1. If one of more
+     * <p>This results in hard failures in either of the following two cases : 1. If one or more
      * partitions do not contain any messages with timestamp larger than or equal to desired
      * timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the
      * messages do not have timestamps.
@@ -991,6 +1004,19 @@ public class KafkaIO {
     }
 
     /**
+     * Use timestamp to set up stop offset. It is only supported by Kafka Client 0.10.1.0 onwards
+     * and the message format version after 0.10.0.
+     *
+     * <p>This results in hard failures in either of the following two cases : 1. If one or more
+     * partitions do not contain any messages with timestamp larger than or equal to desired
+     * timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the
+     * messages do not have timestamps.
+     */
+    public Read<K, V> withStopReadTime(Instant stopReadTime) {
+      return toBuilder().setStopReadTime(stopReadTime).build();
+    }
+
+    /**
      * Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}. Mainly
      * used for tests and demo applications.
      */
@@ -1235,6 +1261,15 @@ public class KafkaIO {
                 + ". If you are building with maven, set \"kafka.clients.version\" "
                 + "maven property to 0.10.1.0 or newer.");
       }
+      if (getStopReadTime() != null) {
+        checkArgument(
+            ConsumerSpEL.hasOffsetsForTimes(),
+            "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, "
+                + "current version of Kafka Client is "
+                + AppInfoParser.getVersion()
+                + ". If you are building with maven, set \"kafka.clients.version\" "
+                + "maven property to 0.10.1.0 or newer.");
+      }
       if (isCommitOffsetsInFinalizeEnabled()) {
         checkArgument(
             getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null,
@@ -1263,6 +1298,9 @@ public class KafkaIO {
           || getMaxNumRecords() < Long.MAX_VALUE
           || getMaxReadTime() != null
           || runnerRequiresLegacyRead(input.getPipeline().getOptions())) {
+        checkArgument(
+            getStopReadTime() == null,
+            "stopReadTime is set but it is only supported via SDF implementation.");
         return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
       }
       return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
@@ -1408,6 +1446,7 @@ public class KafkaIO {
                               kafkaRead.getCheckStopReadingFn(),
                               kafkaRead.getConsumerConfig(),
                               kafkaRead.getStartReadTime(),
+                              kafkaRead.getStopReadTime(),
                               topics.stream().collect(Collectors.toList()))));
 
         } else {
@@ -1433,6 +1472,7 @@ public class KafkaIO {
         this.topics = read.getTopics();
         this.topicPartitions = read.getTopicPartitions();
         this.startReadTime = read.getStartReadTime();
+        this.stopReadTime = read.getStopReadTime();
       }
 
       private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
@@ -1442,6 +1482,8 @@ public class KafkaIO {
 
       private final Instant startReadTime;
 
+      private final Instant stopReadTime;
+
       @VisibleForTesting final Map<String, Object> consumerConfig;
 
       @VisibleForTesting final List<String> topics;
@@ -1459,7 +1501,9 @@ public class KafkaIO {
           }
         }
         for (TopicPartition topicPartition : partitions) {
-          receiver.output(KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
+          receiver.output(
+              KafkaSourceDescriptor.of(
+                  topicPartition, null, startReadTime, null, stopReadTime, null));
         }
       }
     }
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java
index a11fe96..cde7a3b 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.util.List;
@@ -50,6 +52,14 @@ public abstract class KafkaSourceDescriptor implements Serializable {
   @Nullable
   abstract Instant getStartReadTime();
 
+  @SchemaFieldName("stop_read_offset")
+  @Nullable
+  abstract Long getStopReadOffset();
+
+  @SchemaFieldName("stop_read_time")
+  @Nullable
+  abstract Instant getStopReadTime();
+
   @SchemaFieldName("bootstrap_servers")
   @Nullable
   abstract List<String> getBootStrapServers();
@@ -68,15 +78,30 @@ public abstract class KafkaSourceDescriptor implements Serializable {
       TopicPartition topicPartition,
       Long startReadOffset,
       Instant startReadTime,
+      Long stopReadOffset,
+      Instant stopReadTime,
       List<String> bootstrapServers) {
+    checkArguments(startReadOffset, startReadTime, stopReadOffset, stopReadTime);
     return new AutoValue_KafkaSourceDescriptor(
         topicPartition.topic(),
         topicPartition.partition(),
         startReadOffset,
         startReadTime,
+        stopReadOffset,
+        stopReadTime,
         bootstrapServers);
   }
 
+  private static void checkArguments(
+      Long startReadOffset, Instant startReadTime, Long stopReadOffset, Instant stopReadTime) {
+    checkArgument(
+        startReadOffset == null || startReadTime == null,
+        "startReadOffset and startReadTime are optional but mutually exclusive. Please set only one of them.");
+    checkArgument(
+        stopReadOffset == null || stopReadTime == null,
+        "stopReadOffset and stopReadTime are optional but mutually exclusive. Please set only one of them.");
+  }
+
   @SchemaCreate
   @SuppressWarnings("all")
   // TODO(BEAM-10677): Remove this function after AutoValueSchema is fixed.
@@ -85,8 +110,17 @@ public abstract class KafkaSourceDescriptor implements Serializable {
       Integer partition,
       Long start_read_offset,
       Instant start_read_time,
+      Long stop_read_offset,
+      Instant stop_read_time,
       List<String> bootstrap_servers) {
+    checkArguments(start_read_offset, start_read_time, stop_read_offset, stop_read_time);
     return new AutoValue_KafkaSourceDescriptor(
-        topic, partition, start_read_offset, start_read_time, bootstrap_servers);
+        topic,
+        partition,
+        start_read_offset,
+        start_read_time,
+        stop_read_offset,
+        stop_read_time,
+        bootstrap_servers);
   }
 }
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 6c603ad..311a033 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -245,7 +245,19 @@ class ReadFromKafkaDoFn<K, V>
       } else {
         startOffset = offsetConsumer.position(kafkaSourceDescriptor.getTopicPartition());
       }
-      return new OffsetRange(startOffset, Long.MAX_VALUE);
+
+      long endOffset = Long.MAX_VALUE;
+      if (kafkaSourceDescriptor.getStopReadOffset() != null) {
+        endOffset = kafkaSourceDescriptor.getStopReadOffset();
+      } else if (kafkaSourceDescriptor.getStopReadTime() != null) {
+        endOffset =
+            ConsumerSpEL.offsetForTime(
+                offsetConsumer,
+                kafkaSourceDescriptor.getTopicPartition(),
+                kafkaSourceDescriptor.getStopReadTime());
+      }
+
+      return new OffsetRange(startOffset, endOffset);
     }
   }
 
@@ -274,8 +286,11 @@ class ReadFromKafkaDoFn<K, V>
   }
 
   @NewTracker
-  public GrowableOffsetRangeTracker restrictionTracker(
+  public OffsetRangeTracker restrictionTracker(
       @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) {
+    if (restriction.getTo() < Long.MAX_VALUE) {
+      return new OffsetRangeTracker(restriction);
+    }
     Map<String, Object> updatedConsumerConfig =
         overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
     KafkaLatestOffsetEstimator offsetPoller =
@@ -328,6 +343,7 @@ class ReadFromKafkaDoFn<K, V>
       ConsumerSpEL.evaluateAssign(
           consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
       long startOffset = tracker.currentRestriction().getFrom();
+
       long expectedOffset = startOffset;
       consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
       ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
index c4e6272..afe6c66 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
@@ -66,6 +66,7 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
   private final SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
   private final Map<String, Object> kafkaConsumerConfig;
   private final Instant startReadTime;
+  private final Instant stopReadTime;
 
   private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
 
@@ -77,12 +78,14 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
       SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
       Map<String, Object> kafkaConsumerConfig,
       Instant startReadTime,
+      Instant stopReadTime,
       List<String> topics) {
     this.checkDuration = checkDuration == null ? DEFAULT_CHECK_DURATION : checkDuration;
     this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
     this.checkStopReadingFn = checkStopReadingFn;
     this.kafkaConsumerConfig = kafkaConsumerConfig;
     this.startReadTime = startReadTime;
+    this.stopReadTime = stopReadTime;
     this.topics = topics;
   }
 
@@ -132,7 +135,8 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
             foundedTopicPartition.inc();
             existingTopicPartitions.add(topicPartition);
             outputReceiver.output(
-                KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
+                KafkaSourceDescriptor.of(
+                    topicPartition, null, startReadTime, null, stopReadTime, null));
           }
         });
 
@@ -164,7 +168,8 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD
                 Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
             foundedTopicPartition.inc();
             outputReceiver.output(
-                KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
+                KafkaSourceDescriptor.of(
+                    topicPartition, null, startReadTime, null, stopReadTime, null));
           }
         });
 
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index b836993..0b4150c 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.checkerframework.checker.initialization.qual.Initialized;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
@@ -88,7 +89,8 @@ public class ReadFromKafkaDoFnTest {
     private boolean isRemoved = false;
     private long currentPos = 0L;
     private long startOffset = 0L;
-    private long startOffsetForTime = 0L;
+    private KV<Long, Instant> startOffsetForTime = KV.of(0L, Instant.now());
+    private KV<Long, Instant> stopOffsetForTime = KV.of(Long.MAX_VALUE, null);
     private long numOfRecordsPerPoll;
 
     public SimpleMockKafkaConsumer(
@@ -101,7 +103,8 @@ public class ReadFromKafkaDoFnTest {
       this.isRemoved = false;
       this.currentPos = 0L;
       this.startOffset = 0L;
-      this.startOffsetForTime = 0L;
+      this.startOffsetForTime = KV.of(0L, Instant.now());
+      this.stopOffsetForTime = KV.of(Long.MAX_VALUE, null);
       this.numOfRecordsPerPoll = 0L;
     }
 
@@ -117,8 +120,12 @@ public class ReadFromKafkaDoFnTest {
       this.currentPos = pos;
     }
 
-    public void setStartOffsetForTime(long pos) {
-      this.startOffsetForTime = pos;
+    public void setStartOffsetForTime(long offset, Instant time) {
+      this.startOffsetForTime = KV.of(offset, time);
+    }
+
+    public void setStopOffsetForTime(long offset, Instant time) {
+      this.stopOffsetForTime = KV.of(offset, time);
     }
 
     @Override
@@ -174,10 +181,14 @@ public class ReadFromKafkaDoFnTest {
           Iterables.getOnlyElement(
                   timestampsToSearch.keySet().stream().collect(Collectors.toList()))
               .equals(this.topicPartition));
-      return ImmutableMap.of(
-          topicPartition,
-          new OffsetAndTimestamp(
-              this.startOffsetForTime, Iterables.getOnlyElement(timestampsToSearch.values())));
+      Long timeToSearch = Iterables.getOnlyElement(timestampsToSearch.values());
+      Long returnOffset = 0L;
+      if (timeToSearch == this.startOffsetForTime.getValue().getMillis()) {
+        returnOffset = this.startOffsetForTime.getKey();
+      } else if (timeToSearch == this.stopOffsetForTime.getValue().getMillis()) {
+        returnOffset = this.stopOffsetForTime.getKey();
+      }
+      return ImmutableMap.of(topicPartition, new OffsetAndTimestamp(returnOffset, timeToSearch));
     }
 
     @Override
@@ -240,33 +251,70 @@ public class ReadFromKafkaDoFnTest {
   @Test
   public void testInitialRestrictionWhenHasStartOffset() throws Exception {
     long expectedStartOffset = 10L;
-    consumer.setStartOffsetForTime(15L);
+    consumer.setStartOffsetForTime(15L, Instant.now());
     consumer.setCurrentPos(5L);
     OffsetRange result =
         dofnInstance.initialRestriction(
             KafkaSourceDescriptor.of(
-                topicPartition, expectedStartOffset, Instant.now(), ImmutableList.of()));
+                topicPartition, expectedStartOffset, null, null, null, ImmutableList.of()));
     assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result);
   }
 
   @Test
+  public void testInitialRestrictionWhenHasStopOffset() throws Exception {
+    long expectedStartOffset = 10L;
+    long expectedStopOffset = 20L;
+    consumer.setStartOffsetForTime(15L, Instant.now());
+    consumer.setStopOffsetForTime(18L, Instant.now());
+    consumer.setCurrentPos(5L);
+    OffsetRange result =
+        dofnInstance.initialRestriction(
+            KafkaSourceDescriptor.of(
+                topicPartition,
+                expectedStartOffset,
+                null,
+                expectedStopOffset,
+                null,
+                ImmutableList.of()));
+    assertEquals(new OffsetRange(expectedStartOffset, expectedStopOffset), result);
+  }
+
+  @Test
   public void testInitialRestrictionWhenHasStartTime() throws Exception {
     long expectedStartOffset = 10L;
-    consumer.setStartOffsetForTime(expectedStartOffset);
+    Instant startReadTime = Instant.now();
+    consumer.setStartOffsetForTime(expectedStartOffset, startReadTime);
     consumer.setCurrentPos(5L);
     OffsetRange result =
         dofnInstance.initialRestriction(
-            KafkaSourceDescriptor.of(topicPartition, null, Instant.now(), ImmutableList.of()));
+            KafkaSourceDescriptor.of(
+                topicPartition, null, startReadTime, null, null, ImmutableList.of()));
     assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result);
   }
 
   @Test
+  public void testInitialRestrictionWhenHasStopTime() throws Exception {
+    long expectedStartOffset = 10L;
+    Instant startReadTime = Instant.now();
+    long expectedStopOffset = 100L;
+    Instant stopReadTime = startReadTime.plus(Duration.millis(2000));
+    consumer.setStartOffsetForTime(expectedStartOffset, startReadTime);
+    consumer.setStopOffsetForTime(expectedStopOffset, stopReadTime);
+    consumer.setCurrentPos(5L);
+    OffsetRange result =
+        dofnInstance.initialRestriction(
+            KafkaSourceDescriptor.of(
+                topicPartition, null, startReadTime, null, stopReadTime, ImmutableList.of()));
+    assertEquals(new OffsetRange(expectedStartOffset, expectedStopOffset), result);
+  }
+
+  @Test
   public void testInitialRestrictionWithConsumerPosition() throws Exception {
     long expectedStartOffset = 5L;
     consumer.setCurrentPos(5L);
     OffsetRange result =
         dofnInstance.initialRestriction(
-            KafkaSourceDescriptor.of(topicPartition, null, null, ImmutableList.of()));
+            KafkaSourceDescriptor.of(topicPartition, null, null, null, null, ImmutableList.of()));
     assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result);
   }
 
@@ -277,7 +325,8 @@ public class ReadFromKafkaDoFnTest {
     long startOffset = 5L;
     OffsetRangeTracker tracker =
         new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3));
-    KafkaSourceDescriptor descriptor = KafkaSourceDescriptor.of(topicPartition, null, null, null);
+    KafkaSourceDescriptor descriptor =
+        KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
     ProcessContinuation result =
         dofnInstance.processElement(descriptor, tracker, null, (OutputReceiver) receiver);
     assertEquals(ProcessContinuation.stop(), result);
@@ -292,7 +341,7 @@ public class ReadFromKafkaDoFnTest {
     OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
     ProcessContinuation result =
         dofnInstance.processElement(
-            KafkaSourceDescriptor.of(topicPartition, null, null, null),
+            KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null),
             tracker,
             null,
             (OutputReceiver) receiver);
@@ -308,7 +357,7 @@ public class ReadFromKafkaDoFnTest {
     OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
     ProcessContinuation result =
         dofnInstance.processElement(
-            KafkaSourceDescriptor.of(topicPartition, null, null, null),
+            KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null),
             tracker,
             null,
             (OutputReceiver) receiver);
@@ -335,7 +384,7 @@ public class ReadFromKafkaDoFnTest {
     OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
     ProcessContinuation result =
         instance.processElement(
-            KafkaSourceDescriptor.of(topicPartition, null, null, null),
+            KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null),
             tracker,
             null,
             (OutputReceiver) receiver);
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
index eeabf40..0708e6f 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java
@@ -82,7 +82,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
                     new PartitionInfo("topic2", 1, null, null, null))));
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, null);
+            Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, null, null);
     assertEquals(
         ImmutableSet.of(
             new TopicPartition("topic1", 0),
@@ -107,7 +107,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
                 new PartitionInfo("topic2", 1, null, null, null)));
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, givenTopics);
+            Duration.millis(1L), consumerFn, null, ImmutableMap.of(), null, null, givenTopics);
     verify(mockConsumer, never()).listTopics();
     assertEquals(
         ImmutableSet.of(
@@ -122,7 +122,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
   public void testProcessElementWhenNoAvailableTopicPartition() throws Exception {
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null);
+            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null, null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
@@ -140,7 +140,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
     Instant startReadTime = Instant.ofEpochMilli(1L);
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
+            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null, null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics())
@@ -192,6 +192,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
             checkStopReadingFn,
             ImmutableMap.of(),
             startReadTime,
+            null,
             null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
@@ -227,7 +228,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
   public void testOnTimerWithNoAvailableTopicPartition() throws Exception {
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null);
+            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), null, null, null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of());
@@ -248,7 +249,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
     Instant startReadTime = Instant.ofEpochMilli(1L);
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
+            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null, null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics())
@@ -291,7 +292,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
     Instant startReadTime = Instant.ofEpochMilli(1L);
     WatchKafkaTopicPartitionDoFn dofnInstance =
         new WatchKafkaTopicPartitionDoFn(
-            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null);
+            Duration.millis(600L), consumerFn, null, ImmutableMap.of(), startReadTime, null, null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
     when(mockConsumer.listTopics())
@@ -346,6 +347,7 @@ public class WatchKafkaTopicPartitionDoFnTest {
             checkStopReadingFn,
             ImmutableMap.of(),
             startReadTime,
+            null,
             null);
     MockOutputReceiver outputReceiver = new MockOutputReceiver();
 
@@ -440,7 +442,9 @@ public class WatchKafkaTopicPartitionDoFnTest {
   private Set<KafkaSourceDescriptor> generateDescriptorsFromTopicPartitions(
       Set<TopicPartition> topicPartitions, Instant startReadTime) {
     return topicPartitions.stream()
-        .map(topicPartition -> KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null))
+        .map(
+            topicPartition ->
+                KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null, null, null))
         .collect(Collectors.toSet());
   }
 }