You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Congxian Qiu(klion26) (JIRA)" <ji...@apache.org> on 2019/07/12 08:01:00 UTC
[jira] [Commented] (FLINK-13148) Expose
WindowedStream.sideOutputLateData() from CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-13148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16883601#comment-16883601 ]
Congxian Qiu(klion26) commented on FLINK-13148:
-----------------------------------------------
Hi, [~kkl0u] what do you think about this?
> Expose WindowedStream.sideOutputLateData() from CoGroupedStreams
> ----------------------------------------------------------------
>
> Key: FLINK-13148
> URL: https://issues.apache.org/jira/browse/FLINK-13148
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Reporter: Congxian Qiu(klion26)
> Assignee: Congxian Qiu(klion26)
> Priority: Major
>
> As FLINK-10050 supported {{alloedLateness}}, but we can not get the side output containing the late data, this issue wants to fix it.
> For implementation, I want to add an input parameter {{OutputTag}} in {{WithWindow}} as following
> {code:java}
> protected WithWindow(DataStream<T1> input1,
> DataStream<T2> input2,
> KeySelector<T1, KEY> keySelector1,
> KeySelector<T2, KEY> keySelector2,
> TypeInformation<KEY> keyType,
> WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
> Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
> Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
> Time allowedLateness,
> OutputTage<TaggedUnion<T1, T2>> outputTag) {
> ...
> }
> {code}
> and add a function sideOutputLateData(OutputTag<T> outputTag) in {{WithWindow}}
> {code:java}
> public WithWindow<T1, T2, KEY, W> sideOutputLateData(OutputTag<TaggedUnion<T1, T2>> outputTag) {
> ...
> }
> {code}
> In {{WithWindow#apply}} will add outputTag if it is not null
> {code:java}
> public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInfomation<T> resultType) {
> ...
> if (outputTag != null) {
> windowedStream.sideOutputLateData(outputTag);
> }
> ...
> }{code}
> The same will apply to {{JoinedStreams}}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)