You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Piotr Nowojski <pn...@apache.org> on 2021/02/01 10:20:21 UTC
Re: Comment in source code of CoGroupedStreams
Hi Sudharsan,
Sorry for maybe a bit late response, but as far as I can tell, this comment
refers to this piece of code:
public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>>
values, Collector<T> out)
throws Exception {
List<T1> oneValues = new ArrayList<>();
List<T2> twoValues = new ArrayList<>();
for (TaggedUnion<T1, T2> val : values) {
if (val.isOne()) {
oneValues.add(val.getOne());
} else {
twoValues.add(val.getTwo());
}
}
wrappedFunction.coGroup(oneValues, twoValues, out);
}
from
org.apache.flink.streaming.api.datastream.CoGroupedStreams.CoGroupWindowFunction#apply
You are right, WindowOperator uses state backends to store the elements,
but at the very least this function (there might be a reason why is it
doing this eagerly) seems to be assembling CoGrouped elements into two
distinct ArrayLists, before handing them over to the `CoGroupFunction`.
Best,
Piotrek
czw., 21 sty 2021 o 20:38 sudranga <su...@gmail.com> napisaĆ(a):
> Is this comment in the file
>
> flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
> accurate?
>
> " * <p>Note: Right now, the groups are being built in memory so you need to
> ensure that they don't
> * get too big. Otherwise the JVM might crash."
>
> Looking at the source code of CoGroupedStreams, i see that it simply does a
> map, union and then the data is assigned to appropriate windows. I assumed
> that the persistence of elements in the window itself is done using my
> configured state backend (and that appends do not need to read the entire
> list state).
>
> I ask because i tried setting a uid on my cogroup operator like below and
> this results in a compilation error(no uid method available?)
>
> firstStream
> .coGroup(secondStream)
> .where(_.id)
> .equalTo(_.id)
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
> .apply(new MyCogroupFunction())
> .uid("myCogroup")
>
>
> Is the comment referring to having enough memory on the read side? If so,
> isn't this true for any window process function?
>
> Thanks
> Sudharsan
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>