You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Raghu Angadi (JIRA)" <ji...@apache.org> on 2019/01/03 00:15:00 UTC

[jira] [Comment Edited] (BEAM-6333) allow disabling/configuring automatic watermark generation for idle kafka partitions

    [ https://issues.apache.org/jira/browse/BEAM-6333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732539#comment-16732539 ] 

Raghu Angadi edited comment on BEAM-6333 at 1/3/19 12:14 AM:
-------------------------------------------------------------

> If there is some automatic timeout then you can never be completely sure that no record with an older timestamp will be emitted? (What if the kafka broker or network is slow for some reason?)

It is not a timeout. It is 2 seconds before {{last backlog check time}}. Please see how the backlog fetch time is set [1]. It is the timestamp before trying to fetch latest offsets from Kafka. So if network is unusually slow, it would be that much back in the past. 

[1] : https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L686

> (Or at least, you can not be sure in a deterministic way! That is, in a way that can later be repeated by a new instance of the same computation.)

Which determinism are you specifically referring to? Could you give an example?

> Not sure if you want to cater to this kind of use case... (and actually I already worked around it by implementing my own TimestampPolicyFactory, but still thought I may as well contribute this)
We want to cater to all the common use cases. I saw your own factory, if that is exactly what you want, it is fine to implement it that way. The API is meant to be for users to cater to specific use cases.
 I am trying to understand your specific concern this case though. Btw, your implementation will throw exception since watermark can be queried even before any records are read, better to return 'min timestamp' in that case.





was (Author: rangadi):
> If there is some automatic timeout then you can never be completely sure that no record with an older timestamp will be emitted? (What if the kafka broker or network is slow for some reason?)

It is not a timeout. It is 2 seconds before {{last backlog check time}}. Please see how the backlog fetch time is set [1]. It is the timestamp before trying to fetch latest offsets from Kafka. So if network is unusually slow, it would be that much back in the past. 

[1] : https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L686

> (Or at least, you can not be sure in a deterministic way! That is, in a way that can later be repeated by a new instance of the same computation.)

Which determinism are you specifically referring to? Could you give an example?

> Not sure if you want to cater to this kind of use case... (and actually I already worked around it by implementing my own TimestampPolicyFactory, but still thought I may as well contribute this)
We want to cater to all the common use cases. I saw your own factory, if that is exactly what you want, it is fine to implement it that way. It is meant to be for users to cater to specific use cases. I am trying to understand your specific concern this case though. Btw, your implementation will throw exception since watermark can be queried even before any records are read, better to return 'min timestamp' in that case.




> allow disabling/configuring automatic watermark generation for idle kafka partitions
> ------------------------------------------------------------------------------------
>
>                 Key: BEAM-6333
>                 URL: https://issues.apache.org/jira/browse/BEAM-6333
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>    Affects Versions: 2.9.0
>            Reporter: Jan Doms
>            Assignee: Raghu Angadi
>            Priority: Major
>              Labels: easyfix, pull-request-available
>   Original Estimate: 0.5h
>          Time Spent: 10m
>  Remaining Estimate: 20m
>
> For pipelines that require the emitted watermarks to be absolutely correct the current behavior using 2 seconds timeout could lead to problems.
> Therefor it should be possible to disable this behavior. While changing the code the hardcoded 2 seconds can also be made configurable. (There's already a TODO about that in the code.)
>  
> A (preliminary) PR making this change is available: https://github.com/apache/beam/pull/7382.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)