You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Balázs Németh (Jira)" <ji...@apache.org> on 2022/05/26 03:59:00 UTC

[jira] [Updated] (BEAM-14518) Support for reading Kafka topics from any startReadTime in Java

     [ https://issues.apache.org/jira/browse/BEAM-14518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Balázs Németh updated BEAM-14518:
---------------------------------
    Summary: Support for reading Kafka topics from any startReadTime in Java  (was: Support for reading Kafka topics from any startReadTime)

> Support for reading Kafka topics from any startReadTime in Java
> ---------------------------------------------------------------
>
>                 Key: BEAM-14518
>                 URL: https://issues.apache.org/jira/browse/BEAM-14518
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>            Reporter: Balázs Németh
>            Priority: P2
>
> [https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198]
>  
> Right now the 'startReadTime' config for KafkaIO.Read looks up an offset in every topic partition that is newer or equal to that timestamp. The problem is that if we use a timestamp that is so new, that we don't have any newer/equal message in the partition. In that case the code fails with an exception. Meanwhile in certain cases it makes no sense as we could actually make it work.
> If we don't get an offset from calling `consumer.offsetsForTimes`, we should call `endOffsets`, and use the returned offset + 1. That is actually the offset we will have to read next time.
> Even if `endOffsets` can't return an offset we could use 0 as the offset to read from.
>  
> Am I missing something here? Is it okay to contribute this?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)