You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Pawel Bartoszek (JIRA)" <ji...@apache.org> on 2018/03/16 14:43:00 UTC

[jira] [Created] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably

Pawel Bartoszek created BEAM-3863:
-------------------------------------

             Summary: AfterProcessingTime trigger doesn't fire reliably
                 Key: BEAM-3863
                 URL: https://issues.apache.org/jira/browse/BEAM-3863
             Project: Beam
          Issue Type: Bug
          Components: sdk-java-core
    Affects Versions: 2.3.0, 2.2.0, 2.1.0
            Reporter: Pawel Bartoszek
            Assignee: Kenneth Knowles


*Issue*

Beam AfterProcessingTime trigger doesn't fire always reliably after a configured delay.

 

The following job triggers should fire after watermark passes the end of the window and then every 5 seconds for late data and the finally at the end of allowed lateness.

 

*Expected behaviour*

Late firing after processing time trigger should fire after 5 seconds since first late records arrive in the pane.

 

*Actual behaviour*

From my testings late triggers works for some keys but not for the other - it's pretty random which keys are affected. The DummySource generates 15 distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one late record. In case late trigger firing is missed it won't fire until the allowed lateness period. 

*Job code*
{code:java}
String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};

FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
                .triggering(AfterWatermark.pastEndOfWindow()
                        .withLateFirings(
                                AfterProcessingTime
                                        .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardMinutes(2), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
        );
apply.apply(Count.perElement())
        .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
            @ProcessElement
            public void process(ProcessContext context, BoundedWindow window) {
                LOG.info("Count: {}. For window {}, Pane {}", context.element(), window, context.pane());
            }
        }));

pipeline.run().waitUntilFinish();{code}
 

*How can you replicate the issue?*

 I've created a github repo with the code shown above. Please check out the README file for details how to replicate the issue.

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)