You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juan Rodríguez Hortalá <ju...@gmail.com> on 2016/11/09 05:35:02 UTC

An idea for a parallel AllWindowedStream

Hi,

As a self training exercise I've defined a class extending WindowedStream
for implementing a proof of concept for a parallel version of
AllWindowStream

/**
 * Tries to create a parallel version of a AllWindowStream for a DataStream
 * by creating a KeyedStream by using as key the hash of the elements module
 * a parallelism level
 *
 * This only makes sense for window assigners that ensure the subwindows will be
 * in sync, like time based window assigners, and it is more stable
with ingestion
 * and event time because the window alignment is more reliable.
 * This doesn't work for counting or sessions window assigners.
 *
 * Also note elements from different partitions might get out of order due
 * to parallelism
 * */
public static class ParAllWindowedStream<T,W extends Window> extends
WindowedStream<T, Integer, W> {
    private final transient WindowAssigner<Object,W> windowAssigner;

    public ParAllWindowedStream(DataStream<T> stream, final int parallelism,
                                WindowAssigner<Object,W> windowAssigner) {
        super(stream.keyBy(new KeySelector<T, Integer>() {
                           @Override
                            public Integer getKey(T value) throws Exception {
                                return value.hashCode() % parallelism;
                            }
                        }),
              windowAssigner);
        this.windowAssigner = windowAssigner;
    }

    @Override
    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFun) {
        return super.reduce(reduceFun)      // reduce each subwindow
                .windowAll(windowAssigner)  // synchronize
                .reduce(reduceFun);         // sequential aggregate of
    }

    // Cannot override because we need an additional reduce function of type R
    // to recombine the result for each window
    // @Override
    public <R> SingleOutputStreamOperator<R>
applyPar(ReduceFunction<T> reduceFunction,
                                                    WindowFunction<T,
R, Integer, W> function,
                                                    ReduceFunction<R>
reduceWindowsFunction) {
        return super.apply(reduceFunction, function)
                    .windowAll(windowAssigner)
                     .reduce(reduceWindowsFunction);
    }
}

Maybe someone might find this interesting. I have a toy example program in
https://github.com/juanrh/flink-state-eviction/blob/05676ca0eebf83e936b5cc04ecf85e8110ccacf4/src/main/java/com/github/juanrh/streaming/windowAllPoCs/WindowAllTimeKeyedPoC.java
for the curious.

Greetings,

Juan

Re: An idea for a parallel AllWindowedStream

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
yes, this works well in cases and I was also thinking about adding
something like this to Flink.

There can be problems if you use a trigger other than EventTimeTrigger that
possibly fires multiple times or if you specify an allowed lateness. In
those cases, you would overcount elements in the all-window.

Cheers,
Aljoscha

On Wed, 9 Nov 2016 at 06:35 Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

> Hi,
>
> As a self training exercise I've defined a class extending WindowedStream
> for implementing a proof of concept for a parallel version of
> AllWindowStream
>
> /**
>  * Tries to create a parallel version of a AllWindowStream for a DataStream
>  * by creating a KeyedStream by using as key the hash of the elements module
>  * a parallelism level
>  *
>  * This only makes sense for window assigners that ensure the subwindows will be
>  * in sync, like time based window assigners, and it is more stable with ingestion
>  * and event time because the window alignment is more reliable.
>  * This doesn't work for counting or sessions window assigners.
>  *
>  * Also note elements from different partitions might get out of order due
>  * to parallelism
>  * */
> public static class ParAllWindowedStream<T,W extends Window> extends WindowedStream<T, Integer, W> {
>     private final transient WindowAssigner<Object,W> windowAssigner;
>
>     public ParAllWindowedStream(DataStream<T> stream, final int parallelism,
>                                 WindowAssigner<Object,W> windowAssigner) {
>         super(stream.keyBy(new KeySelector<T, Integer>() {
>                            @Override
>                             public Integer getKey(T value) throws Exception {
>                                 return value.hashCode() % parallelism;
>                             }
>                         }),
>               windowAssigner);
>         this.windowAssigner = windowAssigner;
>     }
>
>     @Override
>     public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFun) {
>         return super.reduce(reduceFun)      // reduce each subwindow
>                 .windowAll(windowAssigner)  // synchronize
>                 .reduce(reduceFun);         // sequential aggregate of
>     }
>
>     // Cannot override because we need an additional reduce function of type R
>     // to recombine the result for each window
>     // @Override
>     public <R> SingleOutputStreamOperator<R> applyPar(ReduceFunction<T> reduceFunction,
>                                                     WindowFunction<T, R, Integer, W> function,
>                                                     ReduceFunction<R> reduceWindowsFunction) {
>         return super.apply(reduceFunction, function)
>                     .windowAll(windowAssigner)
>                      .reduce(reduceWindowsFunction);
>     }
> }
>
> Maybe someone might find this interesting. I have a toy example program in
> https://github.com/juanrh/flink-state-eviction/blob/05676ca0eebf83e936b5cc04ecf85e8110ccacf4/src/main/java/com/github/juanrh/streaming/windowAllPoCs/WindowAllTimeKeyedPoC.java
> for the curious.
>
> Greetings,
>
> Juan
>