You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juan Rodríguez Hortalá <ju...@gmail.com> on 2016/11/09 05:35:02 UTC
An idea for a parallel AllWindowedStream
Hi,
As a self training exercise I've defined a class extending WindowedStream
for implementing a proof of concept for a parallel version of
AllWindowStream
/**
* Tries to create a parallel version of a AllWindowStream for a DataStream
* by creating a KeyedStream by using as key the hash of the elements module
* a parallelism level
*
* This only makes sense for window assigners that ensure the subwindows will be
* in sync, like time based window assigners, and it is more stable
with ingestion
* and event time because the window alignment is more reliable.
* This doesn't work for counting or sessions window assigners.
*
* Also note elements from different partitions might get out of order due
* to parallelism
* */
public static class ParAllWindowedStream<T,W extends Window> extends
WindowedStream<T, Integer, W> {
private final transient WindowAssigner<Object,W> windowAssigner;
public ParAllWindowedStream(DataStream<T> stream, final int parallelism,
WindowAssigner<Object,W> windowAssigner) {
super(stream.keyBy(new KeySelector<T, Integer>() {
@Override
public Integer getKey(T value) throws Exception {
return value.hashCode() % parallelism;
}
}),
windowAssigner);
this.windowAssigner = windowAssigner;
}
@Override
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFun) {
return super.reduce(reduceFun) // reduce each subwindow
.windowAll(windowAssigner) // synchronize
.reduce(reduceFun); // sequential aggregate of
}
// Cannot override because we need an additional reduce function of type R
// to recombine the result for each window
// @Override
public <R> SingleOutputStreamOperator<R>
applyPar(ReduceFunction<T> reduceFunction,
WindowFunction<T,
R, Integer, W> function,
ReduceFunction<R>
reduceWindowsFunction) {
return super.apply(reduceFunction, function)
.windowAll(windowAssigner)
.reduce(reduceWindowsFunction);
}
}
Maybe someone might find this interesting. I have a toy example program in
https://github.com/juanrh/flink-state-eviction/blob/05676ca0eebf83e936b5cc04ecf85e8110ccacf4/src/main/java/com/github/juanrh/streaming/windowAllPoCs/WindowAllTimeKeyedPoC.java
for the curious.
Greetings,
Juan
Re: An idea for a parallel AllWindowedStream
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
yes, this works well in cases and I was also thinking about adding
something like this to Flink.
There can be problems if you use a trigger other than EventTimeTrigger that
possibly fires multiple times or if you specify an allowed lateness. In
those cases, you would overcount elements in the all-window.
Cheers,
Aljoscha
On Wed, 9 Nov 2016 at 06:35 Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:
> Hi,
>
> As a self training exercise I've defined a class extending WindowedStream
> for implementing a proof of concept for a parallel version of
> AllWindowStream
>
> /**
> * Tries to create a parallel version of a AllWindowStream for a DataStream
> * by creating a KeyedStream by using as key the hash of the elements module
> * a parallelism level
> *
> * This only makes sense for window assigners that ensure the subwindows will be
> * in sync, like time based window assigners, and it is more stable with ingestion
> * and event time because the window alignment is more reliable.
> * This doesn't work for counting or sessions window assigners.
> *
> * Also note elements from different partitions might get out of order due
> * to parallelism
> * */
> public static class ParAllWindowedStream<T,W extends Window> extends WindowedStream<T, Integer, W> {
> private final transient WindowAssigner<Object,W> windowAssigner;
>
> public ParAllWindowedStream(DataStream<T> stream, final int parallelism,
> WindowAssigner<Object,W> windowAssigner) {
> super(stream.keyBy(new KeySelector<T, Integer>() {
> @Override
> public Integer getKey(T value) throws Exception {
> return value.hashCode() % parallelism;
> }
> }),
> windowAssigner);
> this.windowAssigner = windowAssigner;
> }
>
> @Override
> public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFun) {
> return super.reduce(reduceFun) // reduce each subwindow
> .windowAll(windowAssigner) // synchronize
> .reduce(reduceFun); // sequential aggregate of
> }
>
> // Cannot override because we need an additional reduce function of type R
> // to recombine the result for each window
> // @Override
> public <R> SingleOutputStreamOperator<R> applyPar(ReduceFunction<T> reduceFunction,
> WindowFunction<T, R, Integer, W> function,
> ReduceFunction<R> reduceWindowsFunction) {
> return super.apply(reduceFunction, function)
> .windowAll(windowAssigner)
> .reduce(reduceWindowsFunction);
> }
> }
>
> Maybe someone might find this interesting. I have a toy example program in
> https://github.com/juanrh/flink-state-eviction/blob/05676ca0eebf83e936b5cc04ecf85e8110ccacf4/src/main/java/com/github/juanrh/streaming/windowAllPoCs/WindowAllTimeKeyedPoC.java
> for the curious.
>
> Greetings,
>
> Juan
>