You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Felipe Gutierrez <fe...@gmail.com> on 2018/03/09 21:07:28 UTC
Implement a sort inside the WindowFunction
Hi all,
I have a word count using flink stream and mey reduce transformations is
applying a WindowFunction. I would like that this WindowFunction sort the
output of the reduce. Is that possible? So I will sort by key the data set
inside the window.
Thanks for your ideas!
Here is my code:
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9000)
.map(new UpperCaserMap())
.flatMap(new Splitter())
.keyBy(new SumWordSelect()) // select the first value as a
key using the KeySelector class
.timeWindow(Time.seconds(5)) // use this if Apache Flink
server is up
.reduce(new SumWordsReduce(), new FIlterWindowFunction())
;
public static class ReduceWindowFunction implements WindowFunction<
Tuple2<String, Integer>, // input type
Tuple2<String, Integer>, // output type
String, // key type
TimeWindow> {
@Override
public void apply(String key,
TimeWindow window,
Iterable<Tuple2<String, Integer>> inputs,
Collector<Tuple2<String, Integer>> out) {
Integer sum = 0;
for (Tuple2<String, Integer> input : inputs) {
sum = sum + input.f1;
}
out.collect(new Tuple2<>(key, sum));
}
}
public static class FIlterWindowFunction implements WindowFunction<
Tuple2<String, Integer>, // input type
Tuple2<String, Integer>, // output type
String, // key type
TimeWindow> {
@Override
public void apply(String key,
TimeWindow window,
Iterable<Tuple2<String, Integer>> inputs,
Collector<Tuple2<String, Integer>> out) {
// Integer value = 0;
for (Tuple2<String, Integer> input : inputs) {
// if (input.f1 >= 3 && input.f1 > value) value = input.f1;
out.collect(new Tuple2<>(key, input.f1));
}
}
}
--
*---- Felipe Oliveira Gutierrez*
*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*
Re: Implement a sort inside the WindowFunction
Posted by Felipe Gutierrez <fe...@gmail.com>.
thanks Fabian,
I am building an example and generating my own fake source
<https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java>
to process in Flink. I am going to implement more stuff with keys and event
time processing to get more understanding of it.
I guess it is not very usual to use non-keyed windows since it is not
running in parallel and it is not possible to split the processing. But I
will implement some examples on this to get practice.
Thanks for your replay,
Felipe
On Thu, Mar 15, 2018 at 6:17 AM, Fabian Hueske <fh...@gmail.com> wrote:
> Hi Felipe,
>
> Just like the ReduceFunction, the WindowFunction is applied in the context
> of a single key. So, it will be called for each key and always just see a
> single record (the reduced record of the key).
> You'd have to add a non-keyed window (allWindow) for your sorting
> WindowFunction.
> Note that this function cannot run in parallel.
>
> Best, Fabian
>
> 2018-03-09 22:07 GMT+01:00 Felipe Gutierrez <fe...@gmail.com>
> :
>
>> Hi all,
>>
>> I have a word count using flink stream and mey reduce transformations is
>> applying a WindowFunction. I would like that this WindowFunction sort the
>> output of the reduce. Is that possible? So I will sort by key the data set
>> inside the window.
>>
>> Thanks for your ideas!
>>
>> Here is my code:
>>
>> DataStream<Tuple2<String, Integer>> dataStream = env
>> .socketTextStream("localhost", 9000)
>> .map(new UpperCaserMap())
>> .flatMap(new Splitter())
>> .keyBy(new SumWordSelect()) // select the first value as
>> a key using the KeySelector class
>> .timeWindow(Time.seconds(5)) // use this if Apache Flink
>> server is up
>> .reduce(new SumWordsReduce(), new FIlterWindowFunction())
>> ;
>>
>> public static class ReduceWindowFunction implements WindowFunction<
>> Tuple2<String, Integer>, // input type
>> Tuple2<String, Integer>, // output type
>> String, // key type
>> TimeWindow> {
>>
>> @Override
>> public void apply(String key,
>> TimeWindow window,
>> Iterable<Tuple2<String, Integer>> inputs,
>> Collector<Tuple2<String, Integer>> out) {
>> Integer sum = 0;
>> for (Tuple2<String, Integer> input : inputs) {
>> sum = sum + input.f1;
>> }
>> out.collect(new Tuple2<>(key, sum));
>> }
>> }
>>
>> public static class FIlterWindowFunction implements WindowFunction<
>> Tuple2<String, Integer>, // input type
>> Tuple2<String, Integer>, // output type
>> String, // key type
>> TimeWindow> {
>>
>> @Override
>> public void apply(String key,
>> TimeWindow window,
>> Iterable<Tuple2<String, Integer>> inputs,
>> Collector<Tuple2<String, Integer>> out) {
>> // Integer value = 0;
>> for (Tuple2<String, Integer> input : inputs) {
>> // if (input.f1 >= 3 && input.f1 > value) value =
>> input.f1;
>> out.collect(new Tuple2<>(key, input.f1));
>> }
>> }
>> }
>>
>>
>>
>> --
>>
>> *---- Felipe Oliveira Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>
>
--
*---- Felipe Oliveira Gutierrez*
*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*
Re: Implement a sort inside the WindowFunction
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Felipe,
Just like the ReduceFunction, the WindowFunction is applied in the context
of a single key. So, it will be called for each key and always just see a
single record (the reduced record of the key).
You'd have to add a non-keyed window (allWindow) for your sorting
WindowFunction.
Note that this function cannot run in parallel.
Best, Fabian
2018-03-09 22:07 GMT+01:00 Felipe Gutierrez <fe...@gmail.com>:
> Hi all,
>
> I have a word count using flink stream and mey reduce transformations is
> applying a WindowFunction. I would like that this WindowFunction sort the
> output of the reduce. Is that possible? So I will sort by key the data set
> inside the window.
>
> Thanks for your ideas!
>
> Here is my code:
>
> DataStream<Tuple2<String, Integer>> dataStream = env
> .socketTextStream("localhost", 9000)
> .map(new UpperCaserMap())
> .flatMap(new Splitter())
> .keyBy(new SumWordSelect()) // select the first value as a
> key using the KeySelector class
> .timeWindow(Time.seconds(5)) // use this if Apache Flink
> server is up
> .reduce(new SumWordsReduce(), new FIlterWindowFunction())
> ;
>
> public static class ReduceWindowFunction implements WindowFunction<
> Tuple2<String, Integer>, // input type
> Tuple2<String, Integer>, // output type
> String, // key type
> TimeWindow> {
>
> @Override
> public void apply(String key,
> TimeWindow window,
> Iterable<Tuple2<String, Integer>> inputs,
> Collector<Tuple2<String, Integer>> out) {
> Integer sum = 0;
> for (Tuple2<String, Integer> input : inputs) {
> sum = sum + input.f1;
> }
> out.collect(new Tuple2<>(key, sum));
> }
> }
>
> public static class FIlterWindowFunction implements WindowFunction<
> Tuple2<String, Integer>, // input type
> Tuple2<String, Integer>, // output type
> String, // key type
> TimeWindow> {
>
> @Override
> public void apply(String key,
> TimeWindow window,
> Iterable<Tuple2<String, Integer>> inputs,
> Collector<Tuple2<String, Integer>> out) {
> // Integer value = 0;
> for (Tuple2<String, Integer> input : inputs) {
> // if (input.f1 >= 3 && input.f1 > value) value = input.f1;
> out.collect(new Tuple2<>(key, input.f1));
> }
> }
> }
>
>
>
> --
>
> *---- Felipe Oliveira Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>