You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Balázs Németh (Jira)" <ji...@apache.org> on 2022/05/26 03:57:00 UTC

[jira] [Created] (BEAM-14518) Support for reading Kafka topics from any startReadTime

Balázs Németh created BEAM-14518:
------------------------------------

             Summary: Support for reading Kafka topics from any startReadTime
                 Key: BEAM-14518
                 URL: https://issues.apache.org/jira/browse/BEAM-14518
             Project: Beam
          Issue Type: Bug
          Components: io-java-kafka
            Reporter: Balázs Németh


[https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198]

 

Right now the 'startReadTime' config for KafkaIO.Read looks up an offset in every topic partition that is newer or equal to that timestamp. The problem is that if we use a timestamp that is so new, that we don't have any newer/equal message in the partition. In that case the code fails with an exception. Meanwhile in certain cases it makes no sense as we could actually make it work.

If we don't get an offset from calling `consumer.offsetsForTimes`, we should call `endOffsets`, and use the returned offset + 1. That is actually the offset we will have to read next time.

Even if `endOffsets` can't return an offset we could use 0 as the offset to read from.

 

Am I missing something here? Is it okay to contribute this?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)