You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Kenneth Knowles (JIRA)" <ji...@apache.org> on 2018/01/19 23:00:08 UTC

[jira] [Commented] (BEAM-3225) Non deterministic behaviour of AfterProcessingTime trigger with multiple group by transformations

    [ https://issues.apache.org/jira/browse/BEAM-3225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16332979#comment-16332979 ] 

Kenneth Knowles commented on BEAM-3225:
---------------------------------------

I haven't read the whole trace you posted yet, but I want to highlight this: Only the DataflowRunner and DirectRunner support the "synchronized" part of synchronized processing time. And we basically intend to drop it. We've had some discussions about dangerous behavior that it causes. Essentially, if you choose a repeating processing time trigger, you should expect all your data out in some number of panes from the second GBK, but you can't be too picky about how many.

> Non deterministic behaviour of AfterProcessingTime trigger with multiple group by transformations
> -------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3225
>                 URL: https://issues.apache.org/jira/browse/BEAM-3225
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core, runner-flink
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Pawel Bartoszek
>            Assignee: Kenneth Knowles
>            Priority: Critical
>
> *Context*
> I run my [test code|https://gist.github.com/pbartoszek/fe6b1d7501333a2a4b385a1424f6bd80] against different triggers and runners. My original problem was that when writing to a file sink files weren't always produced in a deterministic way. Please refer to this [BEAM-3151|https://issues.apache.org/jira/browse/BEAM-3151] When I started looking at WriteFiles class I noticed that file sink implementation includes some multiple GroupByKey transformations. Knowing that I wrote my test code that is using multiple GroupByKey transformations to conclude that this is a bit buggy(?) support of After(Synchronised)ProcessingTime triggers by GroupByKey that also influence the file sink. When I run my job using Dataflow runner I was getting expected output.
> *About test code*
> The job is counting how many A and B elements it received within 30 sec windows using Count.perElement. Then I am using GroupByKey to fire every time count has increased.
> Below I outlined the expected standard output: 
> {code:java}
> Let's assume all events are received in the same 30 seconds window.
> A -> After count KV{A, 1} -> Final group by KV{A, [1]}
> A -> After count KV{A, 2} -> Final group by KV{A, [1,2]}
> A -> After count KV{A, 3} -> Final group by KV{A, [1,2,3]}
> B -> After count KV{B, 1} -> Final group by KV{B, [1]}
> With my trigger configuration I would expect that for every new element 'After count' is printed with new value followed by 'Final group by' with new counter. Final group by represents the history of counters then.{code}
>  
> *Problem*
> 'Final group by' trigger doesn't always go off although trigger set up would suggest that. This behaviour is different for different runners and Beam versions. 
>  
> *My observations when using Pubsub*
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))              
>                         .withAllowedLateness(standardSeconds(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
>  
>  Beam 2.0 Flink Runner
> {code:java}
> 2017-11-16T14:51:44.294Z After count KV{A, 1} [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.036Z Received Element A [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.143Z After count KV{A, 2} [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.246Z Final group by KV{A, [1, 2]} [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:52:03.522Z Received Element A [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:03.629Z After count KV{A, 1} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:03.732Z Final group by KV{A, [1]} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.270Z Received Element A [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.372Z After count KV{A, 2} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.476Z Final group by KV{A, [1, 2]} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.394Z Received Element A [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.501Z After count KV{A, 3} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.602Z Final group by KV{A, [1, 2, 3]} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.296Z Received Element A [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.402Z After count KV{A, 4} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.507Z Final group by KV{A, [1, 2, 3, 4]} [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:14.951Z Received Element A [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) <--- Expected to see 'After count' after this
> 2017-11-16T14:52:35.320Z Received Element A [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
> 2017-11-16T14:52:35.426Z After count KV{A, 1} [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
> 2017-11-16T14:52:35.532Z Final group by KV{A, [1]} [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
> {code}
> Beam 2.0 Direct Runner
> {code:java}
> 2017-11-16T14:49:34.135Z Received Element A [2017-11-16T14:49:00.000Z..2017-11-16T14:49:30.000Z)
> 2017-11-16T14:49:34.324Z After count KV{A, 1} [2017-11-16T14:49:00.000Z..2017-11-16T14:49:30.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T14:49:37.526Z Received Element A [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
> 2017-11-16T14:49:37.535Z After count KV{A, 1} [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T14:49:44.287Z Received Element A [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
> 2017-11-16T14:49:44.294Z After count KV{A, 2} [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
> 2017-11-16T14:49:47.991Z Received Element A [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T14:49:47.995Z After count KV{A, 3} [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T14:50:03.323Z Received Element A [2017-11-16T14:50:00.000Z..2017-11-16T14:50:30.000Z)
> 2017-11-16T14:50:03.328Z After count KV{A, 1} [2017-11-16T14:50:00.000Z..2017-11-16T14:50:30.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T14:51:14.309Z Received Element A [2017-11-16T14:51:00.000Z..2017-11-16T14:51:30.000Z)
> 2017-11-16T14:51:14.315Z After count KV{A, 1} [2017-11-16T14:51:00.000Z..2017-11-16T14:51:30.000Z) <--- Expected to see 'Final group by' after this
> {code}
>  Beam 2.1 Flink Runner
> {code:java}
> 2017-11-16T15:13:02.747Z After count KV{A, 1} [2017-11-16T15:12:30.000Z..2017-11-16T15:13:00.000Z)
> 2017-11-16T15:13:02.761Z Final group by KV{A, [1]} [2017-11-16T15:12:30.000Z..2017-11-16T15:13:00.000Z)
> 2017-11-16T15:13:09.492Z Received Element A [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:09.501Z After count KV{A, 1} [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:09.608Z Final group by KV{A, [1]} [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:13.029Z Received Element A [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:13.134Z After count KV{A, 2} [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:13.240Z Final group by KV{A, [1, 2]} [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:15.420Z Received Element A [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) <--- Expected to see 'After count' after this
> 2017-11-16T15:13:38.285Z Received Element A [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
> 2017-11-16T15:13:38.379Z After count KV{A, 1} [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
> 2017-11-16T15:13:38.481Z Final group by KV{A, [1]} [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
> {code}
> Beam 2.1 Direct Runner
> {code:java}
> 2017-11-16T15:17:38.485Z Received Element A [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:38.595Z After count KV{A, 1} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:38.608Z Final group by KV{A, [1]} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:44.977Z Received Element A [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:44.985Z After count KV{A, 2} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:44.988Z Final group by KV{A, [1, 2]} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:51.126Z Received Element A [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) <--- Expected to see 'After count' after this
> 2017-11-16T15:17:57.154Z Received Element A [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.154Z After count KV{A, 3} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.154Z Received Element A [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.158Z After count KV{A, 4} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.161Z After count KV{A, 5} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.163Z Final group by KV{A, [1, 2, 3, 4, 5]} [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> {code}
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
> )              
>                         .withAllowedLateness(standardSeconds(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
> Beam 2.0 Flink Runner
> {code:java}
> 2017-11-16T14:54:14.017Z Received Element A [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
> 2017-11-16T14:54:14.187Z After count KV{A, 1} [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
> 2017-11-16T14:54:14.205Z Final group by KV{A, [1]} [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
> 2017-11-16T14:54:38.499Z Received Element A [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:38.604Z After count KV{A, 1} [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:38.709Z Final group by KV{A, [1]} [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:42.665Z Received Element A [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:42.770Z After count KV{A, 2} [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z) <--- Expected to see 'Final group by' after this (Although I can see the output for next minute already the final group by trigger is lost for this 30 s windows possibly forever?
> 2017-11-16T14:55:06.131Z Received Element A [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
> 2017-11-16T14:55:06.237Z After count KV{A, 1} [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
> 2017-11-16T14:55:06.342Z Final group by KV{A, [1]} [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
> {code}
> Beam 2.1 Flink Runner
> {code:java}
> 2017-11-16T15:11:09.666Z Received Element A [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
> 2017-11-16T15:11:09.838Z After count KV{A, 1} [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
> 2017-11-16T15:11:09.853Z Final group by KV{A, [1]} [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
> 2017-11-16T15:11:14.208Z Received Element A [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T15:11:33.216Z Received Element A [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:33.322Z After count KV{A, 1} [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:33.327Z Final group by KV{A, [1]} [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:54.740Z Received Element A [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:54.843Z After count KV{A, 2} [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:54.947Z Final group by KV{A, [1, 2]} [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> {code}
> *My observations when using Kinesis Stream*
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))              
>                         .withAllowedLateness(standardSeconds(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
> Beam 2.1 Direct Runner
> {code:java}
> 2017-11-16T10:56:33.241Z A
> 2017-11-16T10:56:33.460Z After count KV{A, 1} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:56:33.475Z Final group by KV{A, [1]} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:56:37.916Z B
> 2017-11-16T10:56:37.950Z After count KV{B, 1} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:56:37.956Z Final group by KV{B, [1]} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.388Z After count KV{A, 1} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.388Z After count KV{B, 1} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.392Z Final group by KV{B, [1, 1]} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.392Z Final group by KV{A, [1, 1]} [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> {code}
> Beam 2.0 Direct Runner
> {code:java}
> 2017-11-16T10:55:11.851Z A
> 2017-11-16T10:55:11.854Z After count KV{A, 1} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z) --
> 2017-11-16T10:55:18.329Z B
> 2017-11-16T10:55:18.333Z After count KV{B, 1} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.195Z After count KV{A, 1} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.196Z After count KV{B, 1} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.199Z Final group by KV{A, [1, 1]} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.199Z Final group by KV{B, [1, 1]} [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> {code}
> Beam 2.0 Flink Runner
> {code:java}
> 2017-11-16T11:00:04.820Z A
> 2017-11-16T11:00:04.838Z After count KV{A, 1} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:04.943Z Final group by KV{A, [1]} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:10.105Z B
> 2017-11-16T11:00:10.138Z After count KV{B, 1} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:30.188Z Final group by KV{B, [1]} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.190Z After count KV{A, 1} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.191Z After count KV{B, 1} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.295Z Final group by KV{A, [1, 1]} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) <--- Why Final group by is triggered only after allowed lateness at 11:00:35?
> 2017-11-16T11:00:35.297Z Final group by KV{B, [1, 1]} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.298Z Final group by KV{A, []} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) <-- allowed lateness configuration dictates that only non empty panes should be trigger!!!
> 2017-11-16T11:00:35.298Z Final group by KV{B, []} [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> {code}
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
> )              
>                         .withAllowedLateness(standardSeconds(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
> Beam 2.1 Direct Runner
> {code:java}
> 2017-11-16T11:14:36.754Z A
> 2017-11-16T11:14:36.912Z After count KV{A, 1} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:14:36.925Z Final group by KV{A, [1]} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:14:45.431Z B
> 2017-11-16T11:14:45.437Z After count KV{B, 1} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:14:45.440Z Final group by KV{B, [1]} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.034Z After count KV{A, 1} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.035Z After count KV{B, 1} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.039Z Final group by KV{A, [1, 1]} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.040Z Final group by KV{B, [1, 1]} [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> {code}
> Beam 2.0 Direct Runner
> {code:java}
> 2017-11-16T11:05:12.562Z A
> 2017-11-16T11:05:12.565Z After count KV{A, 1} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:15.326Z B
> 2017-11-16T11:05:15.330Z After count KV{B, 1} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.456Z After count KV{A, 1} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.457Z After count KV{B, 1} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.459Z Final group by KV{A, [1, 1]} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.459Z Final group by KV{B, [1, 1]} [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> {code}
> Beam 2.0 Flink Runner - run 1
> {code:java}
> 2017-11-16T11:06:32.634Z A
> 2017-11-16T11:06:32.797Z After count KV{A, 1} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:06:32.812Z Final group by KV{A, [1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:06:36.449Z B
> 2017-11-16T11:06:36.550Z After count KV{B, 1} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:06:36.654Z Final group by KV{B, [1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.153Z After count KV{A, 1} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.155Z After count KV{B, 1} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.258Z Final group by KV{A, [1, 1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.260Z Final group by KV{B, [1, 1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.262Z Final group by KV{A, [1, 1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.263Z Final group by KV{B, [1, 1]} [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> {code}
> Beam 2.0 Flink Runner - run 2
> {code:java}
> 2017-11-16T11:09:41.538Z A
> 2017-11-16T11:09:41.618Z After count KV{A, 1} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:09:41.687Z Final group by KV{A, [1]} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:09:45.671Z B
> 2017-11-16T11:09:45.681Z After count KV{B, 1} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T11:10:00.102Z After count KV{A, 1} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.103Z After count KV{B, 1} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.201Z Final group by KV{B, [1, 1]} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.202Z Final group by KV{A, [1, 1]} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.203Z Final group by KV{B, [1, 1]} [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> {code}
> Beam 2.0 Flink Runner - run 3
> {code:java}
> 2017-11-16T11:08:09.474Z A
> 2017-11-16T11:08:09.558Z After count KV{A, 1} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:09.646Z Final group by KV{A, [1]} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:14.541Z B
> 2017-11-16T11:08:30.255Z After count KV{A, 1} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T11:08:30.256Z After count KV{B, 1} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:30.307Z Final group by KV{A, [1, 1]} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:30.309Z Final group by KV{A, [1, 1]} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:30.310Z Final group by KV{B, [1]} [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> {code}
> Beam 2.0 Flink Runner - run 4
> {code:java}
> 2017-11-16T11:07:44.814Z A
> 2017-11-16T11:07:44.841Z After count KV{A, 1} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T11:07:48.883Z B
> 2017-11-16T11:07:48.901Z After count KV{B, 1} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T11:08:00.099Z After count KV{A, 1} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T11:08:00.101Z After count KV{B, 1} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 'Final group by' after this
> 2017-11-16T11:08:00.204Z Final group by KV{A, [1, 1]} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> 2017-11-16T11:08:00.205Z Final group by KV{B, [1, 1]} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> 2017-11-16T11:08:00.206Z Final group by KV{A, [1, 1]} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> 2017-11-16T11:08:00.207Z Final group by KV{B, [1, 1]} [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> {code}
>  
>  *Workaround*
> If you check test code I redefined a first trigger just before second group by key transformations and I was started getting expected result on my local machine using both direct runner and flink runner. However when I deployed job to Flink cluster the Final group by trigger didn't go off sometimes.
>  
> *My intuition*
>  I guess that there is some bug with handling After(Synchronised)ProcessingTime triggers in Beam. AfterWartermark trigger always works as expected. It's very interesting that AfterProcessingTime triggers are going off at different times when comparing Beam 2.0 and 2.1.
>  I am a bit worried that this bug might be still in Beam 2.2 although not occurring that frequently.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)