You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/15 23:36:38 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #15951: [BEAM-13171] Support for stopReadTime on KafkaIO SDF

lukecwik commented on a change in pull request #15951:
URL: https://github.com/apache/beam/pull/15951#discussion_r749756335



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -376,6 +396,9 @@ public ProcessContinuation processElement(
             outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord);
           }
           receiver.outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), outputTimestamp);
+          if (expectedOffset > endOffset) {

Review comment:
       Since the restriction is already bounded by `endOffset` you don't need to check here as well since `tryClaim` above will return false.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java
##########
@@ -50,6 +50,14 @@
   @Nullable

Review comment:
       It seems like it should be an error to specify both `stop_read_time` and `stop_read_offset`, similarly for `start_read_time` and `start_read_offset`.
   
   We can add some simple error checking within the static method `of(...)` and `create(...)`

##########
File path: sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
##########
@@ -116,10 +118,14 @@ public void setCurrentPos(long pos) {
       this.currentPos = pos;
     }
 
-    public void setStartOffsetForTime(long pos) {
+    public void setStartOffsetForTime(KV<Long, Instant> pos) {

Review comment:
       Why not make this method take two parameters and construct its own internal representation using the KV as necessary.
   
   Please do the same for `setStopOffsetForTime`

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -990,6 +1003,19 @@ public void setTimestampPolicy(String timestampPolicy) {
       return toBuilder().setStartReadTime(startReadTime).build();
     }
 
+    /**
+     * Use timestamp to set up stop offset. It is only supported by Kafka Client 0.10.1.0 onwards
+     * and the message format version after 0.10.0.
+     *
+     * <p>This results in hard failures in either of the following two cases : 1. If one of more
+     * partitions do not contain any messages with timestamp larger than or equal to desired

Review comment:
       ```suggestion
        * <p>This results in hard failures in either of the following two cases : 1. If one or more
        * partitions do not contain any messages with timestamp larger than or equal to desired
   ```
   
   Can you also fix the same typo in `withStartReadTime`?

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1235,6 +1261,15 @@ public void setTimestampPolicy(String timestampPolicy) {
                 + ". If you are building with maven, set \"kafka.clients.version\" "
                 + "maven property to 0.10.1.0 or newer.");
       }
+      if (getStopReadTime() != null) {
+        checkArgument(
+            ConsumerSpEL.hasOffsetsForTimes(),
+            "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, "
+                + "current version of Kafka Client is "
+                + AppInfoParser.getVersion()
+                + ". If you are building with maven, set \"kafka.clients.version\" "
+                + "maven property to 0.10.1.0 or newer.");
+      }

Review comment:
       Either implement support for stopReadTime within ReadFromKafkaViaUnbounded or add an error [here](https://github.com/apache/beam/blob/7d54488eca45743e379f1873eda7f6d390f97b50/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1266) stating that `stopReadTime` is only supported via SDF implementation.

##########
File path: CHANGES.md
##########
@@ -76,6 +76,7 @@
   https://issues.apache.org/jira/browse/BEAM-11205)). For Google Cloud client library versions set by this BOM,
   see [this table](https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/24.0.0/artifact_details.html).
 * Removed avro-python3 dependency in AvroIO. Fastavro has already been our Avro library of choice on Python 3. Boolean use_fastavro is left for api compatibility, but will have no effect.([BEAM-13016](https://github.com/apache/beam/pull/15900)).
+* Support for stopReadTime on KafkaIO SDF (Java).([BEAM-13171](https://issues.apache.org/jira/browse/BEAM-13171)).

Review comment:
       It would make sense to list this under I/Os




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org