You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Felipe Gutierrez <fe...@gmail.com> on 2018/03/09 21:07:28 UTC

Implement a sort inside the WindowFunction

Hi all,

I have a word count using flink stream and mey reduce transformations is
applying a WindowFunction. I would like that this WindowFunction sort the
output of the reduce. Is that possible? So I will sort by key the data set
inside the window.

Thanks for your ideas!

Here is my code:

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9000)
                .map(new UpperCaserMap())
                .flatMap(new Splitter())
                .keyBy(new SumWordSelect()) // select the first value as a
key using the KeySelector class
                .timeWindow(Time.seconds(5)) // use this if Apache Flink
server is up
                .reduce(new SumWordsReduce(), new FIlterWindowFunction())
                ;

    public static class ReduceWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            Integer sum = 0;
            for (Tuple2<String, Integer> input : inputs) {
                sum = sum + input.f1;
            }
            out.collect(new Tuple2<>(key, sum));
        }
    }

    public static class FIlterWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            // Integer value = 0;
            for (Tuple2<String, Integer> input : inputs) {
                // if (input.f1 >= 3 && input.f1 > value) value = input.f1;
                out.collect(new Tuple2<>(key, input.f1));
            }
        }
    }



-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Re: Implement a sort inside the WindowFunction

Posted by Felipe Gutierrez <fe...@gmail.com>.
thanks Fabian,

I am building an example and generating my own fake source
<https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java>
to process in Flink. I am going to implement more stuff with keys and event
time processing to get more understanding of it.
I guess it is not very usual to use non-keyed windows since it is not
running in parallel and it is not possible to split the processing. But I
will implement some examples on this to get practice.

Thanks for your replay,
Felipe

On Thu, Mar 15, 2018 at 6:17 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Felipe,
>
> Just like the ReduceFunction, the WindowFunction is applied in the context
> of a single key. So, it will be called for each key and always just see a
> single record (the reduced record of the key).
> You'd have to add a non-keyed window (allWindow) for your sorting
> WindowFunction.
> Note that this function cannot run in parallel.
>
> Best, Fabian
>
> 2018-03-09 22:07 GMT+01:00 Felipe Gutierrez <fe...@gmail.com>
> :
>
>> Hi all,
>>
>> I have a word count using flink stream and mey reduce transformations is
>> applying a WindowFunction. I would like that this WindowFunction sort the
>> output of the reduce. Is that possible? So I will sort by key the data set
>> inside the window.
>>
>> Thanks for your ideas!
>>
>> Here is my code:
>>
>>         DataStream<Tuple2<String, Integer>> dataStream = env
>>                 .socketTextStream("localhost", 9000)
>>                 .map(new UpperCaserMap())
>>                 .flatMap(new Splitter())
>>                 .keyBy(new SumWordSelect()) // select the first value as
>> a key using the KeySelector class
>>                 .timeWindow(Time.seconds(5)) // use this if Apache Flink
>> server is up
>>                 .reduce(new SumWordsReduce(), new FIlterWindowFunction())
>>                 ;
>>
>>     public static class ReduceWindowFunction implements WindowFunction<
>>             Tuple2<String, Integer>, // input type
>>             Tuple2<String, Integer>, // output type
>>             String, // key type
>>             TimeWindow> {
>>
>>         @Override
>>         public void apply(String key,
>>                           TimeWindow window,
>>                           Iterable<Tuple2<String, Integer>> inputs,
>>                           Collector<Tuple2<String, Integer>> out) {
>>             Integer sum = 0;
>>             for (Tuple2<String, Integer> input : inputs) {
>>                 sum = sum + input.f1;
>>             }
>>             out.collect(new Tuple2<>(key, sum));
>>         }
>>     }
>>
>>     public static class FIlterWindowFunction implements WindowFunction<
>>             Tuple2<String, Integer>, // input type
>>             Tuple2<String, Integer>, // output type
>>             String, // key type
>>             TimeWindow> {
>>
>>         @Override
>>         public void apply(String key,
>>                           TimeWindow window,
>>                           Iterable<Tuple2<String, Integer>> inputs,
>>                           Collector<Tuple2<String, Integer>> out) {
>>             // Integer value = 0;
>>             for (Tuple2<String, Integer> input : inputs) {
>>                 // if (input.f1 >= 3 && input.f1 > value) value =
>> input.f1;
>>                 out.collect(new Tuple2<>(key, input.f1));
>>             }
>>         }
>>     }
>>
>>
>>
>> --
>>
>> *---- Felipe Oliveira Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>
>


-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Re: Implement a sort inside the WindowFunction

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Felipe,

Just like the ReduceFunction, the WindowFunction is applied in the context
of a single key. So, it will be called for each key and always just see a
single record (the reduced record of the key).
You'd have to add a non-keyed window (allWindow) for your sorting
WindowFunction.
Note that this function cannot run in parallel.

Best, Fabian

2018-03-09 22:07 GMT+01:00 Felipe Gutierrez <fe...@gmail.com>:

> Hi all,
>
> I have a word count using flink stream and mey reduce transformations is
> applying a WindowFunction. I would like that this WindowFunction sort the
> output of the reduce. Is that possible? So I will sort by key the data set
> inside the window.
>
> Thanks for your ideas!
>
> Here is my code:
>
>         DataStream<Tuple2<String, Integer>> dataStream = env
>                 .socketTextStream("localhost", 9000)
>                 .map(new UpperCaserMap())
>                 .flatMap(new Splitter())
>                 .keyBy(new SumWordSelect()) // select the first value as a
> key using the KeySelector class
>                 .timeWindow(Time.seconds(5)) // use this if Apache Flink
> server is up
>                 .reduce(new SumWordsReduce(), new FIlterWindowFunction())
>                 ;
>
>     public static class ReduceWindowFunction implements WindowFunction<
>             Tuple2<String, Integer>, // input type
>             Tuple2<String, Integer>, // output type
>             String, // key type
>             TimeWindow> {
>
>         @Override
>         public void apply(String key,
>                           TimeWindow window,
>                           Iterable<Tuple2<String, Integer>> inputs,
>                           Collector<Tuple2<String, Integer>> out) {
>             Integer sum = 0;
>             for (Tuple2<String, Integer> input : inputs) {
>                 sum = sum + input.f1;
>             }
>             out.collect(new Tuple2<>(key, sum));
>         }
>     }
>
>     public static class FIlterWindowFunction implements WindowFunction<
>             Tuple2<String, Integer>, // input type
>             Tuple2<String, Integer>, // output type
>             String, // key type
>             TimeWindow> {
>
>         @Override
>         public void apply(String key,
>                           TimeWindow window,
>                           Iterable<Tuple2<String, Integer>> inputs,
>                           Collector<Tuple2<String, Integer>> out) {
>             // Integer value = 0;
>             for (Tuple2<String, Integer> input : inputs) {
>                 // if (input.f1 >= 3 && input.f1 > value) value = input.f1;
>                 out.collect(new Tuple2<>(key, input.f1));
>             }
>         }
>     }
>
>
>
> --
>
> *---- Felipe Oliveira Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>