You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "huangjianhuang (JIRA)" <ji...@apache.org> on 2018/01/18 03:39:00 UTC

[jira] [Issue Comment Deleted] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner

     [ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

huangjianhuang updated BEAM-3414:
---------------------------------
    Comment: was deleted

(was: Thanks for help. Does this bug appear in other Runner? Or can you give me some advice which Runner is more close to the DirectRunner. My codes works fine with DirectRunner but got so many problems with FlinkRunner:()

> AfterProcessingTime trigger issue with Flink Runner
> ---------------------------------------------------
>
>                 Key: BEAM-3414
>                 URL: https://issues.apache.org/jira/browse/BEAM-3414
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core, runner-flink
>    Affects Versions: 2.2.0
>         Environment: idea, ubuntu 16.04, FlinkRunner
>            Reporter: huangjianhuang
>            Assignee: Kenneth Knowles
>            Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the total count of recieved data, as follow:
> {code:java}
>         FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
>                 .as(FlinkPipelineOptions.class);
>         options.setStreaming(true);
>         options.setRunner(FlinkRunner.class);
>         Pipeline pipeline = Pipeline.create(options);
>         pipeline
>                 .apply("Read from kafka",
>                         KafkaIO.<String, String>read()
> //                                .withTimestampFn(kafkaData -> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
>                                 .withBootstrapServers("localhost:9092")
>                                 .withTopic("recharge")
>                                 .withKeyDeserializer(StringDeserializer.class)
>                                 .withValueDeserializer(StringDeserializer.class)
>                                 .withoutMetadata()
>                 )
>                 .apply(Values.create())
>                 .apply(Window.<String>into(new GlobalWindows())
>                                 .triggering(Repeatedly.forever(
>                                         AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
>                                 .accumulatingFiredPanes()
>                 )
>                 .apply(Count.globally())
>                 .apply("output",
>                         ParDo.of(new DoFn<Long, Void>() {
>                             @ProcessElement
>                             public void process(ProcessContext context) {
>                                 System.out.println("---get at: " + Instant.now() + "------");
>                                 System.out.println(context.element());
>                             }
>                         }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes there were nothing display after I sent data. the pic shows the outputs i got in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
> 	---get at: 2018-01-05T06:34:36.668Z------
> 	681
> Send 681Msg at: 2018-01-05T06:34:47.166
> 	---get at: 2018-01-05T06:34:52.284Z------
> 	1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
> 	---get at: 2018-01-05T06:35:22.112Z------
> 	2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)