You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Jean-Baptiste Onofré (Jira)" <ji...@apache.org> on 2019/11/07 16:40:00 UTC
[jira] [Updated] (BEAM-8347) UnboundedRabbitMqReader can fail to
advance watermark if no new data comes in
[ https://issues.apache.org/jira/browse/BEAM-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jean-Baptiste Onofré updated BEAM-8347:
---------------------------------------
Fix Version/s: (was: 2.17.0)
(was: 2.16.0)
2.18.0
> UnboundedRabbitMqReader can fail to advance watermark if no new data comes in
> -----------------------------------------------------------------------------
>
> Key: BEAM-8347
> URL: https://issues.apache.org/jira/browse/BEAM-8347
> Project: Beam
> Issue Type: Bug
> Components: io-java-rabbitmq
> Affects Versions: 2.15.0
> Environment: testing has been done using the DirectRunner. I also have DataflowRunner available
> Reporter: Daniel Robert
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Fix For: 2.18.0
>
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> I stumbled upon this and then saw a similar StackOverflow post: [https://stackoverflow.com/questions/55736593/apache-beam-rabbitmqio-watermark-doesnt-advance]
> When calling `advance()` if there are no messages, no state changes, including no changes to the CheckpointMark or Watermark. If there is a relatively constant rate of new messages coming in, this is not a problem. If data is bursty, and there are periods of no new messages coming in, the watermark will never advance.
> Contrast this with some of the logic in PubsubIO which will make provisions for periods of inactivity to advance the watermark (although it, too, is imperfect: https://issues.apache.org/jira/browse/BEAM-7322 )
> The example given in the StackOverflow post is something like this:
>
> {code:java}
> pipeline
> .apply(RabbitMqIO.read()
> .withUri("amqp://guest:guest@localhost:5672")
> .withQueue("test")
> .apply("Windowing",
> Window.<RabbitMqMessage>into(
> FixedWindows.of(Duration.standardSeconds(10)))
> .triggering(AfterWatermark.pastEndOfWindow())
> .withAllowedLateness(Duration.ZERO)
> .accumulatingFiredPanes()){code}
> If I push 2 messages into my rabbit queue, I see 2 unack'd messages and a window that never performs an on time trigger.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)