You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Jozef Vilcek (JIRA)" <ji...@apache.org> on 2019/01/29 08:11:00 UTC

[jira] [Commented] (BEAM-2185) KafkaIO bounded source

    [ https://issues.apache.org/jira/browse/BEAM-2185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16754714#comment-16754714 ] 

Jozef Vilcek commented on BEAM-2185:
------------------------------------

With KafkaIO supporting settings `withMaxRecords()` and `commitOffsetsInFinalize()` one can be under impression that batch workload is possible. BEAM-6466 that commit of offsets has still some miss-behaviour. 

What I also noticed is a weak guarantees around commit of offsets. Commit is invoked by calling checkpoint and finalizing it. This is done in ReadFn processElements after reading from source enough elements and/or for long enough time here:
[https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L203]

I guess that checkpoint is being made before all elements downstream are successfully processed. It his is used in batch, on failure and restart this can loose some data. Right? Is there a way in Beam model to make sure offsets are committed only after elements are successfully processed? In batch run mode it probably means pipeline is finished with DONE state?

> KafkaIO bounded source
> ----------------------
>
>                 Key: BEAM-2185
>                 URL: https://issues.apache.org/jira/browse/BEAM-2185
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-kafka
>            Reporter: Raghu Angadi
>            Priority: Major
>
> KafkaIO could be a useful source for batch applications as well. It could implement a bounded source. The primary question is how the bounds are specified.
> One option : Source specifies a time period (say 9am-10am), and KafkaIO fetches appropriate start and end offsets based on time-index in Kafka. This would suite many batch applications that are launched on a scheduled.
> Another option is to always read till the end and commit the offsets to Kafka. Handling failures and multiple runs of a task might be complicated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)