You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Tipper <jo...@hotmail.com> on 2019/06/08 13:19:24 UTC

How are timestamps treated within an iterative DataStream loop within Flink?

Hi All,

How are timestamps treated within an iterative DataStream loop within Flink?

For example, here is an example of a simple iterative loop within Flink where the feedback loop is of a different type to the input stream:

DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
    @Override
    public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some processing of the stream of MyInput values
        // emit MyOutput values downstream by calling out.collect()
        out.collect(someInstanceOfMyOutput);
    }

    @Override
    public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some more processing on the feedback classes
        // emit feedback items
        ctx.output(outputTag, someInstanceOfMyFeedback);
    }
});

iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));

My questions revolve around how does Flink use timestamps within a feedback loop:

  *   Within the ConnectedIterativeStreams, how does Flink treat ordering of the input objects across the streams of regular inputs and feedback objects? If I emit an object into the feedback loop, when will it be seen by the head of the loop with respect to the regular stream of input objects?
  *   How does the behaviour change when using event time processing?

Many thanks,

John

Question also posted to StackOverflow here: https://stackoverflow.com/questions/56506020/how-does-flink-treat-timestamps-within-iterative-loops


Re: How are timestamps treated within an iterative DataStream loop within Flink?

Posted by Yun Gao <yu...@aliyun.com>.
Hi John,

    As a whole, I think currently Flink does not have special mechanism for event-time in iteration. This means the IterationHead treats the initial input and the feedback input as two normal inputs and use the same mechanism with the tasks outside the iteration.

    This may cause disorder of the event-time inside the iteration. The event time relies on the watermark alignment mechanism to mark the least event-time of the following records. Suppose we have a watermark with event-time 10, The iteration head will first receive the  watermark from the initial input, and then receive it again from the feedback input after the first round of iteration. Then IterationHead will think the watermarks have aligned at the event-time 10, so it will emits the watermark with event-time 10 to the final output, which means that it will not receive and emit records whose event-time is less than 10. However, since the records may iterate multiple rounds, the IterationHead may still receive the records whose event-time is less than 10 again in the following rounds of iteration. Then the disorder of the event-time occurs.

Best,
Yun Gao


------------------------------------------------------------------
From:John Tipper <jo...@hotmail.com>
Send Time:2019 Jun. 8 (Sat.) 21:19
To:user@flink.apache.org <us...@flink.apache.org>
Subject:How are timestamps treated within an iterative DataStream loop within Flink?


 Hi All,
 How are timestamps treated within an iterative DataStream loop within Flink?
 For example, here is an example of a simple iterative loop within Flink where the feedback loop is of a different type to the input stream:
DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
    @Override
    public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some processing of the stream of MyInput values
        // emit MyOutput values downstream by calling out.collect()
        out.collect(someInstanceOfMyOutput);
    }

    @Override
    public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some more processing on the feedback classes
        // emit feedback items
        ctx.output(outputTag, someInstanceOfMyFeedback);
    }
});

iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));
 My questions revolve around how does Flink use timestamps within a feedback loop:
Within the ConnectedIterativeStreams, how does Flink treat ordering of the input objects across the streams of regular inputs and feedback objects? If I emit an object into the feedback loop, when will it be seen by the head of the loop with respect to the regular stream of input objects?
How does the behaviour change when using event time processing?

 Many thanks,

 John

 Question also posted to StackOverflow here: https://stackoverflow.com/questions/56506020/how-does-flink-treat-timestamps-within-iterative-loops