You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Swapnil Chougule <th...@gmail.com> on 2016/09/14 07:04:47 UTC

Tumbling window rich functionality

Hi Team,

I am using tumbling window functionality having window size 5 minutes.
I want to perform setup & teardown functionality for each window. I tried
using RichWindowFunction but it didn't work for me.
Can anybody tell me how can I do it ?

Attaching code snippet what I tried

impressions.map(new
LineItemAdUnitAggr()).keyBy(0).timeWindow(Time.seconds(300)).apply(new
RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>, Boolean, Tuple,
TimeWindow>() {

                @Override
                public void open(Configuration parameters) throws Exception
{
                    super.open(parameters);
                    //setup method
                }

                public void apply(Tuple key, TimeWindow window,
                        Iterable<Tuple2<Tuple2<Integer, Integer>, Long>>
input,
                        Collector<Boolean> out) throws Exception {
                    //do processing
                }

                @Override
                public void close() throws Exception {
                    //tear down method
                    super.close();
                }
            });

Thanks,
Swapnil

Re: Tumbling window rich functionality

Posted by Robert Metzger <rm...@apache.org>.
Hi,
apply() will be called for each key.

On Wed, Oct 12, 2016 at 2:25 PM, Swapnil Chougule <th...@gmail.com>
wrote:

> Thanks Aljoscha.
>
> Whenever I am using WindowFunction.apply() on keyed stream, apply() will
> be called once or multiple times (equal to number of keys in that windowed
> stream)?
>
> Ex:
> DataStream<Boolean> dataStream = env
>                 .socketTextStream("localhost", 9999)
>                 .flatMap(new Splitter())
>                 .keyBy(0)
>                 .timeWindow(Time.seconds(10))
>                 .apply(new WindowFunction<Tuple2<String,Integer>,
> Boolean, Tuple, TimeWindow>() {
>
>                     @Override
>                     public void apply(Tuple key, TimeWindow window,
>                             Iterable<Tuple2<String, Integer>> input,
>                             Collector<Boolean> out) throws Exception {
>                      //Some business logic
>                     }
>                 });
>
> Regards,
> Swapnil
>
> On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> WindowFunction.apply() will be called once for each window so you should
>> be able to do the setup/teardown in there. open() and close() are called at
>> the start of processing, end of processing, respectively.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <th...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> I am using tumbling window functionality having window size 5 minutes.
>>> I want to perform setup & teardown functionality for each window. I
>>> tried using RichWindowFunction but it didn't work for me.
>>> Can anybody tell me how can I do it ?
>>>
>>> Attaching code snippet what I tried
>>>
>>> impressions.map(new LineItemAdUnitAggr()).keyBy(0)
>>> .timeWindow(Time.seconds(300)).apply(new RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>,
>>> Boolean, Tuple, TimeWindow>() {
>>>
>>>                 @Override
>>>                 public void open(Configuration parameters) throws
>>> Exception {
>>>                     super.open(parameters);
>>>                     //setup method
>>>                 }
>>>
>>>                 public void apply(Tuple key, TimeWindow window,
>>>                         Iterable<Tuple2<Tuple2<Integer, Integer>,
>>> Long>> input,
>>>                         Collector<Boolean> out) throws Exception {
>>>                     //do processing
>>>                 }
>>>
>>>                 @Override
>>>                 public void close() throws Exception {
>>>                     //tear down method
>>>                     super.close();
>>>                 }
>>>             });
>>>
>>> Thanks,
>>> Swapnil
>>>
>>
>

Re: Tumbling window rich functionality

Posted by Swapnil Chougule <th...@gmail.com>.
Thanks Aljoscha.

Whenever I am using WindowFunction.apply() on keyed stream, apply() will be
called once or multiple times (equal to number of keys in that windowed
stream)?

Ex:
DataStream<Boolean> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(10))
                .apply(new WindowFunction<Tuple2<String,Integer>, Boolean,
Tuple, TimeWindow>() {

                    @Override
                    public void apply(Tuple key, TimeWindow window,
                            Iterable<Tuple2<String, Integer>> input,
                            Collector<Boolean> out) throws Exception {
                     //Some business logic
                    }
                });

Regards,
Swapnil

On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> WindowFunction.apply() will be called once for each window so you should
> be able to do the setup/teardown in there. open() and close() are called at
> the start of processing, end of processing, respectively.
>
> Cheers,
> Aljoscha
>
> On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <th...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> I am using tumbling window functionality having window size 5 minutes.
>> I want to perform setup & teardown functionality for each window. I tried
>> using RichWindowFunction but it didn't work for me.
>> Can anybody tell me how can I do it ?
>>
>> Attaching code snippet what I tried
>>
>> impressions.map(new LineItemAdUnitAggr()).keyBy(0)
>> .timeWindow(Time.seconds(300)).apply(new RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>,
>> Boolean, Tuple, TimeWindow>() {
>>
>>                 @Override
>>                 public void open(Configuration parameters) throws
>> Exception {
>>                     super.open(parameters);
>>                     //setup method
>>                 }
>>
>>                 public void apply(Tuple key, TimeWindow window,
>>                         Iterable<Tuple2<Tuple2<Integer, Integer>, Long>>
>> input,
>>                         Collector<Boolean> out) throws Exception {
>>                     //do processing
>>                 }
>>
>>                 @Override
>>                 public void close() throws Exception {
>>                     //tear down method
>>                     super.close();
>>                 }
>>             });
>>
>> Thanks,
>> Swapnil
>>
>

Re: Tumbling window rich functionality

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
WindowFunction.apply() will be called once for each window so you should be
able to do the setup/teardown in there. open() and close() are called at
the start of processing, end of processing, respectively.

Cheers,
Aljoscha

On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <th...@gmail.com>
wrote:

> Hi Team,
>
> I am using tumbling window functionality having window size 5 minutes.
> I want to perform setup & teardown functionality for each window. I tried
> using RichWindowFunction but it didn't work for me.
> Can anybody tell me how can I do it ?
>
> Attaching code snippet what I tried
>
> impressions.map(new
> LineItemAdUnitAggr()).keyBy(0).timeWindow(Time.seconds(300)).apply(new
> RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>, Boolean, Tuple,
> TimeWindow>() {
>
>                 @Override
>                 public void open(Configuration parameters) throws
> Exception {
>                     super.open(parameters);
>                     //setup method
>                 }
>
>                 public void apply(Tuple key, TimeWindow window,
>                         Iterable<Tuple2<Tuple2<Integer, Integer>, Long>>
> input,
>                         Collector<Boolean> out) throws Exception {
>                     //do processing
>                 }
>
>                 @Override
>                 public void close() throws Exception {
>                     //tear down method
>                     super.close();
>                 }
>             });
>
> Thanks,
> Swapnil
>