You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Swapnil Chougule <th...@gmail.com> on 2016/09/14 07:04:47 UTC
Tumbling window rich functionality
Hi Team,
I am using tumbling window functionality having window size 5 minutes.
I want to perform setup & teardown functionality for each window. I tried
using RichWindowFunction but it didn't work for me.
Can anybody tell me how can I do it ?
Attaching code snippet what I tried
impressions.map(new
LineItemAdUnitAggr()).keyBy(0).timeWindow(Time.seconds(300)).apply(new
RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>, Boolean, Tuple,
TimeWindow>() {
@Override
public void open(Configuration parameters) throws Exception
{
super.open(parameters);
//setup method
}
public void apply(Tuple key, TimeWindow window,
Iterable<Tuple2<Tuple2<Integer, Integer>, Long>>
input,
Collector<Boolean> out) throws Exception {
//do processing
}
@Override
public void close() throws Exception {
//tear down method
super.close();
}
});
Thanks,
Swapnil
Re: Tumbling window rich functionality
Posted by Robert Metzger <rm...@apache.org>.
Hi,
apply() will be called for each key.
On Wed, Oct 12, 2016 at 2:25 PM, Swapnil Chougule <th...@gmail.com>
wrote:
> Thanks Aljoscha.
>
> Whenever I am using WindowFunction.apply() on keyed stream, apply() will
> be called once or multiple times (equal to number of keys in that windowed
> stream)?
>
> Ex:
> DataStream<Boolean> dataStream = env
> .socketTextStream("localhost", 9999)
> .flatMap(new Splitter())
> .keyBy(0)
> .timeWindow(Time.seconds(10))
> .apply(new WindowFunction<Tuple2<String,Integer>,
> Boolean, Tuple, TimeWindow>() {
>
> @Override
> public void apply(Tuple key, TimeWindow window,
> Iterable<Tuple2<String, Integer>> input,
> Collector<Boolean> out) throws Exception {
> //Some business logic
> }
> });
>
> Regards,
> Swapnil
>
> On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> WindowFunction.apply() will be called once for each window so you should
>> be able to do the setup/teardown in there. open() and close() are called at
>> the start of processing, end of processing, respectively.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <th...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> I am using tumbling window functionality having window size 5 minutes.
>>> I want to perform setup & teardown functionality for each window. I
>>> tried using RichWindowFunction but it didn't work for me.
>>> Can anybody tell me how can I do it ?
>>>
>>> Attaching code snippet what I tried
>>>
>>> impressions.map(new LineItemAdUnitAggr()).keyBy(0)
>>> .timeWindow(Time.seconds(300)).apply(new RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>,
>>> Boolean, Tuple, TimeWindow>() {
>>>
>>> @Override
>>> public void open(Configuration parameters) throws
>>> Exception {
>>> super.open(parameters);
>>> //setup method
>>> }
>>>
>>> public void apply(Tuple key, TimeWindow window,
>>> Iterable<Tuple2<Tuple2<Integer, Integer>,
>>> Long>> input,
>>> Collector<Boolean> out) throws Exception {
>>> //do processing
>>> }
>>>
>>> @Override
>>> public void close() throws Exception {
>>> //tear down method
>>> super.close();
>>> }
>>> });
>>>
>>> Thanks,
>>> Swapnil
>>>
>>
>
Re: Tumbling window rich functionality
Posted by Swapnil Chougule <th...@gmail.com>.
Thanks Aljoscha.
Whenever I am using WindowFunction.apply() on keyed stream, apply() will be
called once or multiple times (equal to number of keys in that windowed
stream)?
Ex:
DataStream<Boolean> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(10))
.apply(new WindowFunction<Tuple2<String,Integer>, Boolean,
Tuple, TimeWindow>() {
@Override
public void apply(Tuple key, TimeWindow window,
Iterable<Tuple2<String, Integer>> input,
Collector<Boolean> out) throws Exception {
//Some business logic
}
});
Regards,
Swapnil
On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek <al...@apache.org>
wrote:
> Hi,
> WindowFunction.apply() will be called once for each window so you should
> be able to do the setup/teardown in there. open() and close() are called at
> the start of processing, end of processing, respectively.
>
> Cheers,
> Aljoscha
>
> On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <th...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> I am using tumbling window functionality having window size 5 minutes.
>> I want to perform setup & teardown functionality for each window. I tried
>> using RichWindowFunction but it didn't work for me.
>> Can anybody tell me how can I do it ?
>>
>> Attaching code snippet what I tried
>>
>> impressions.map(new LineItemAdUnitAggr()).keyBy(0)
>> .timeWindow(Time.seconds(300)).apply(new RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>,
>> Boolean, Tuple, TimeWindow>() {
>>
>> @Override
>> public void open(Configuration parameters) throws
>> Exception {
>> super.open(parameters);
>> //setup method
>> }
>>
>> public void apply(Tuple key, TimeWindow window,
>> Iterable<Tuple2<Tuple2<Integer, Integer>, Long>>
>> input,
>> Collector<Boolean> out) throws Exception {
>> //do processing
>> }
>>
>> @Override
>> public void close() throws Exception {
>> //tear down method
>> super.close();
>> }
>> });
>>
>> Thanks,
>> Swapnil
>>
>
Re: Tumbling window rich functionality
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
WindowFunction.apply() will be called once for each window so you should be
able to do the setup/teardown in there. open() and close() are called at
the start of processing, end of processing, respectively.
Cheers,
Aljoscha
On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <th...@gmail.com>
wrote:
> Hi Team,
>
> I am using tumbling window functionality having window size 5 minutes.
> I want to perform setup & teardown functionality for each window. I tried
> using RichWindowFunction but it didn't work for me.
> Can anybody tell me how can I do it ?
>
> Attaching code snippet what I tried
>
> impressions.map(new
> LineItemAdUnitAggr()).keyBy(0).timeWindow(Time.seconds(300)).apply(new
> RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>, Boolean, Tuple,
> TimeWindow>() {
>
> @Override
> public void open(Configuration parameters) throws
> Exception {
> super.open(parameters);
> //setup method
> }
>
> public void apply(Tuple key, TimeWindow window,
> Iterable<Tuple2<Tuple2<Integer, Integer>, Long>>
> input,
> Collector<Boolean> out) throws Exception {
> //do processing
> }
>
> @Override
> public void close() throws Exception {
> //tear down method
> super.close();
> }
> });
>
> Thanks,
> Swapnil
>