You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Jean-Baptiste Onofré (Jira)" <ji...@apache.org> on 2019/11/07 16:39:00 UTC

[jira] [Resolved] (BEAM-8347) UnboundedRabbitMqReader can fail to advance watermark if no new data comes in

     [ https://issues.apache.org/jira/browse/BEAM-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jean-Baptiste Onofré resolved BEAM-8347.
----------------------------------------
    Fix Version/s: 2.16.0
       Resolution: Fixed

> UnboundedRabbitMqReader can fail to advance watermark if no new data comes in
> -----------------------------------------------------------------------------
>
>                 Key: BEAM-8347
>                 URL: https://issues.apache.org/jira/browse/BEAM-8347
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-rabbitmq
>    Affects Versions: 2.15.0
>         Environment: testing has been done using the DirectRunner. I also have DataflowRunner available
>            Reporter: Daniel Robert
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>             Fix For: 2.16.0
>
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> I stumbled upon this and then saw a similar StackOverflow post: [https://stackoverflow.com/questions/55736593/apache-beam-rabbitmqio-watermark-doesnt-advance]
> When calling `advance()` if there are no messages, no state changes, including no changes to the CheckpointMark or Watermark.  If there is a relatively constant rate of new messages coming in, this is not a problem. If data is bursty, and there are periods of no new messages coming in, the watermark will never advance.
> Contrast this with some of the logic in PubsubIO which will make provisions for periods of inactivity to advance the watermark (although it, too, is imperfect: https://issues.apache.org/jira/browse/BEAM-7322 )
> The example given in the StackOverflow post is something like this:
>  
> {code:java}
> pipeline
>   .apply(RabbitMqIO.read()
>   .withUri("amqp://guest:guest@localhost:5672")
>   .withQueue("test")
>   .apply("Windowing", 
>     Window.<RabbitMqMessage>into(
>       FixedWindows.of(Duration.standardSeconds(10)))
>     .triggering(AfterWatermark.pastEndOfWindow())
>     .withAllowedLateness(Duration.ZERO)
>     .accumulatingFiredPanes()){code}
> If I push 2 messages into my rabbit queue, I see 2 unack'd messages and a window that never performs an on time trigger.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)