You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2022/11/08 05:57:00 UTC

[jira] [Commented] (FLINK-22587) Support aggregations in batch mode with DataStream API

    [ https://issues.apache.org/jira/browse/FLINK-22587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630193#comment-17630193 ] 

Yun Gao commented on FLINK-22587:
---------------------------------

Hi, sorry for forgetting to update here, with some more try we found that it works to use an event-time window that assigns all the records to the same window [0, +Inf):[https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java]

And the join works with 

source1.join(source2)
                .where(a -> a.f0)
                .equalTo(b -> b.f0)
                .window(new EndOfStreamWindows())
                .apply(xx)

for both bounded streaming processing and batch processing. It does not require the records to have event-time and watermark, since the assignment does not rely on event-time, and the window will be triggered by the Long.MAX_VALUE inserted at the end of stream. 

But we'll still try to propose a proper fix for this issue. One option is that we does not force to set a window in this case, if the window is not set, we'll by default mark it all the records. 

> Support aggregations in batch mode with DataStream API
> ------------------------------------------------------
>
>                 Key: FLINK-22587
>                 URL: https://issues.apache.org/jira/browse/FLINK-22587
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.12.0, 1.13.0
>            Reporter: Etienne Chauchot
>            Priority: Major
>
> A pipeline like this *in batch mode* would output no data
> {code:java}
> stream.join(otherStream)
>     .where(<KeySelector>)
>     .equalTo(<KeySelector>)
>     .window(GlobalWindows.create())
>     .apply(<JoinFunction>)
> {code}
> Indeed the default trigger for GlobalWindow is NeverTrigger which never fires. If we set a _EventTimeTrigger_ it will fire with every element as the watermark will be set to +INF (batch mode) and will pass the end of the global window with each new element. A _ProcessingTimeTrigger_ never fires either and all elapsed time or delta based triggers would not be suited for batch.
> Same goes for _reduce()_ instead of join().
> So I guess we miss something for batch support with DataStream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)