You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Kenneth Knowles (JIRA)" <ji...@apache.org> on 2018/01/18 04:42:00 UTC
[jira] [Assigned] (BEAM-3414) AfterProcessingTime trigger issue
with Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles reassigned BEAM-3414:
-------------------------------------
Assignee: Aljoscha Krettek (was: Kenneth Knowles)
> AfterProcessingTime trigger issue with Flink Runner
> ---------------------------------------------------
>
> Key: BEAM-3414
> URL: https://issues.apache.org/jira/browse/BEAM-3414
> Project: Beam
> Issue Type: Bug
> Components: runner-core, runner-flink
> Affects Versions: 2.2.0
> Environment: idea, ubuntu 16.04, FlinkRunner
> Reporter: huangjianhuang
> Assignee: Aljoscha Krettek
> Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the total count of recieved data, as follow:
> {code:java}
> FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(FlinkPipelineOptions.class);
> options.setStreaming(true);
> options.setRunner(FlinkRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read from kafka",
> KafkaIO.<String, String>read()
> // .withTimestampFn(kafkaData -> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
> .withBootstrapServers("localhost:9092")
> .withTopic("recharge")
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> )
> .apply(Values.create())
> .apply(Window.<String>into(new GlobalWindows())
> .triggering(Repeatedly.forever(
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
> .accumulatingFiredPanes()
> )
> .apply(Count.globally())
> .apply("output",
> ParDo.of(new DoFn<Long, Void>() {
> @ProcessElement
> public void process(ProcessContext context) {
> System.out.println("---get at: " + Instant.now() + "------");
> System.out.println(context.element());
> }
> }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes there were nothing display after I sent data. the pic shows the outputs i got in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
> ---get at: 2018-01-05T06:34:36.668Z------
> 681
> Send 681Msg at: 2018-01-05T06:34:47.166
> ---get at: 2018-01-05T06:34:52.284Z------
> 1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
> ---get at: 2018-01-05T06:35:22.112Z------
> 2044
> {code}
> btw, the code works fine with direct runner.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)