You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishnu Viswanath <vi...@gmail.com> on 2016/03/12 19:19:53 UTC

Behavior of SlidingProessingTimeWindow with CountTrigger

Hi All,


I have the below code


val sev = StreamExecutionEnvironment.getExecutionEnvironment
val socTextStream = sev.socketTextStream("localhost",4444)

val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .sum(1)

counts.print()
sev.execute()

I am sending messages to the port 4444 using nc -lk 4444
This is my sample input

a
a
a
a
a
b
b
b
b
b
c
c
c
c
c
d
d
d
d
d
e
e
e
e
e

I am sending 5 of each letter since I have a Count Trigger of 5. I was
expecting that for each 5 character, the code will print 5, i.e., (a,5)
(b,5) etc. But the output I am getting is little confusing.
Output:

1> (a,5)
1> (a,5)
1> (b,5)
2> (c,5)
2> (c,5)
1> (d,5)
1> (e,5)
1> (e,5)

As you can see, for some character the count is printed twice(a,c,e) and
for some characters it is printed only once (b,d). I am not able to figure
out what is going on. I think it may have something to do with the
SlidingProcessingTimeWindow but I am not sure.
Can someone explain me what is going on?


Thanks and Regards,
Vishnu Viswanath
www.vishnuviswanath.com
​

Re: Behavior of SlidingProessingTimeWindow with CountTrigger

Posted by Vishnu Viswanath <vi...@gmail.com>.
Hi Aljoscha,

Thank you for the explanation and the link on IBM infosphere. That explains
whey I am seeing (a,3) and (b,3) in my example.

Yes, the name Evictor is confusing.

Thanks and Regards,
Vishnu Viswanath,
www.vishnuviswanath.com

On Mon, Mar 14, 2016 at 11:24 AM, Aljoscha Krettek <al...@apache.org>
wrote:

Hi,
> sure, the evictors are a bit confusing (especially the fact that they are
> called evictors). They should more correctly called “Keepers”. The process
> is the following:
>
> 1. Trigger Fires
> 2. Evictor decides what elements to keep, so a CountEvictor.of(3) says,
> keep only three elements, all others are evicted
> 3. Elements that remain after evictor are used for processing
>
> We mostly have Evictors for legacy reasons nowadays since the original
> window implementation was based on ideas in IBM InfoSphere streams. See
> this part of their documentation for some explanation:
> https://www.ibm.com/support/knowledgecenter/#!/SSCRJU_4.0.0/com.ibm.streams.dev.doc/doc/windowhandling.html
>
> - aljoscha
> > On 14 Mar 2016, at 17:04, Vishnu Viswanath <vi...@gmail.com>
> wrote:
> >
> > Hi Aijoscha,
> >
> > Wow, great illustration.
> >
> > That was very clear explanation. Yes, I did enter the elements fast for
> case b and I was seeing more of case As.
> > Also, sometimes I have seen a window getting triggered when I enter 1 or
> 2 elements, I believe that is expansion of case A, w.r.t to window 2.
> >
> > Also can you explain me the case when using Evictor.
> > e.g.,
> >
> >
> > val counts = socTextStream.flatMap{_.split("\\s")}
> >   .map { (_, 1) }
> >   .keyBy(0)
> >
>  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
> >   .trigger(CountTrigger.of(5))
> >   .evictor(CountEvictor.of(3))
> >   .sum(1).setParallelism(4);
> >
> > counts.print()
> > sev.execute()
> >
> > for the input
> >
> >
> > a
> >
> > a
> >
> > a
> >
> > a
> >
> > a
> >
> > b
> >
> > b
> >
> > b
> >
> > b
> >
> > b
> >
> > I got the output as
> >
> >
> > 1> (a,3)
> >
> > 1> (b,3)
> >
> > 2> (b,3)
> >
> > My assumption was that, when the Trigger is triggered, the processing
> will be done on the entire items in the window,
> >
> > and then 3 items will be evicted from the window, which can also be part
> of the next processing of that window. But
> >
> > here it looks like  the sum is calculated only on the items that were
> evicted from the window.
> >
> > Could you please explain what is going on here.
> >
> >
> >
> > Thanks and Regards,
> > Vishnu Viswanath,
> > www.vishnuviswanath.com
> >
> > On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
> > Hi,
> > I created a visualization to help explain the situation:
> http://s21.postimg.org/dofhcw52f/window_example.png
> > <window example.png>
> > The SlidingProcessingTimeWindows assigner assigns elements to windows
> based on the current processing time. The CountTrigger only fires if a
> window contains 5 elements (or more). In your test the windows for a, c and
> e fell into case b because you probably entered the letters very fast. For
> elements  b and d we have case a The elements were far enough apart or you
> happened to enter them right on a window boundary such that only one window
> contains all of them. The other windows don’t contain enough elements to
> reach 5. In my drawing window 1 contains 5 elements while window 2 only
> contains 3 of those elements.
> >
> > I hope this helps.
> >
> > Cheers,
> > Aljoscha
> >> On 12 Mar 2016, at 19:19, Vishnu Viswanath <
> vishnu.viswanath25@gmail.com> wrote:
> >>
> >> Hi All,
> >>
> >>
> >> I have the below code
> >>
> >>
> >> val sev = StreamExecutionEnvironment.getExecutionEnvironment
> >> val socTextStream = sev.socketTextStream("localhost",4444)
> >>
> >> val counts = socTextStream.flatMap{_.split("\\s")}
> >>   .map { (_, 1) }
> >>   .keyBy(0)
> >>
>  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
> >>   .trigger(CountTrigger.of(5))
> >>   .sum(1)
> >>
> >> counts.print()
> >> sev.execute()
> >>
> >> I am sending messages to the port 4444 using nc -lk 4444
> >> This is my sample input
> >>
> >> a
> >> a
> >> a
> >> a
> >> a
> >> b
> >> b
> >> b
> >> b
> >> b
> >> c
> >> c
> >> c
> >> c
> >> c
> >> d
> >> d
> >> d
> >> d
> >> d
> >> e
> >> e
> >> e
> >> e
> >> e
> >>
> >> I am sending 5 of each letter since I have a Count Trigger of 5. I was
> expecting that for each 5 character, the code will print 5, i.e., (a,5)
> (b,5) etc. But the output I am getting is little confusing.
> >> Output:
> >>
> >> 1> (a,5)
> >> 1> (a,5)
> >> 1> (b,5)
> >> 2> (c,5)
> >> 2> (c,5)
> >> 1> (d,5)
> >> 1> (e,5)
> >> 1> (e,5)
> >>
> >> As you can see, for some character the count is printed twice(a,c,e)
> and for some characters it is printed only once (b,d). I am not able to
> figure out what is going on. I think it may have something to do with the
> SlidingProcessingTimeWindow but I am not sure.
> >> Can someone explain me what is going on?
> >>
> >>
> >> Thanks and Regards,
> >> Vishnu Viswanath
> >> www.vishnuviswanath.com
> >>
> >
> >
>
​

Re: Behavior of SlidingProessingTimeWindow with CountTrigger

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
sure, the evictors are a bit confusing (especially the fact that they are called evictors). They should more correctly called “Keepers”. The process is the following:

1. Trigger Fires
2. Evictor decides what elements to keep, so a CountEvictor.of(3) says, keep only three elements, all others are evicted
3. Elements that remain after evictor are used for processing

We mostly have Evictors for legacy reasons nowadays since the original window implementation was based on ideas in IBM InfoSphere streams. See this part of their documentation for some explanation: https://www.ibm.com/support/knowledgecenter/#!/SSCRJU_4.0.0/com.ibm.streams.dev.doc/doc/windowhandling.html

- aljoscha
> On 14 Mar 2016, at 17:04, Vishnu Viswanath <vi...@gmail.com> wrote:
> 
> Hi Aijoscha,
> 
> Wow, great illustration.
> 
> That was very clear explanation. Yes, I did enter the elements fast for case b and I was seeing more of case As.
> Also, sometimes I have seen a window getting triggered when I enter 1 or 2 elements, I believe that is expansion of case A, w.r.t to window 2.
> 
> Also can you explain me the case when using Evictor.
> e.g.,
> 
> 
> val counts = socTextStream.flatMap{_.split("\\s")}
>   .map { (_, 1) }
>   .keyBy(0)
>   .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>   .trigger(CountTrigger.of(5))
>   .evictor(CountEvictor.of(3))
>   .sum(1).setParallelism(4);
> 
> counts.print()
> sev.execute()
> 
> for the input
> 
> 
> a
> 
> a
> 
> a
> 
> a
> 
> a
> 
> b
> 
> b
> 
> b
> 
> b
> 
> b
> 
> I got the output as
> 
> 
> 1> (a,3)
> 
> 1> (b,3)
> 
> 2> (b,3)
> 
> My assumption was that, when the Trigger is triggered, the processing will be done on the entire items in the window,
> 
> and then 3 items will be evicted from the window, which can also be part of the next processing of that window. But
> 
> here it looks like  the sum is calculated only on the items that were evicted from the window.
> 
> Could you please explain what is going on here.
> 
> 
> 
> Thanks and Regards,
> Vishnu Viswanath,
> www.vishnuviswanath.com
> 
> On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek <al...@apache.org> wrote:
> Hi,
> I created a visualization to help explain the situation: http://s21.postimg.org/dofhcw52f/window_example.png
> <window example.png>
> The SlidingProcessingTimeWindows assigner assigns elements to windows based on the current processing time. The CountTrigger only fires if a window contains 5 elements (or more). In your test the windows for a, c and e fell into case b because you probably entered the letters very fast. For elements  b and d we have case a The elements were far enough apart or you happened to enter them right on a window boundary such that only one window contains all of them. The other windows don’t contain enough elements to reach 5. In my drawing window 1 contains 5 elements while window 2 only contains 3 of those elements.
> 
> I hope this helps.
> 
> Cheers,
> Aljoscha
>> On 12 Mar 2016, at 19:19, Vishnu Viswanath <vi...@gmail.com> wrote:
>> 
>> Hi All,
>> 
>> 
>> I have the below code
>> 
>> 
>> val sev = StreamExecutionEnvironment.getExecutionEnvironment
>> val socTextStream = sev.socketTextStream("localhost",4444)
>> 
>> val counts = socTextStream.flatMap{_.split("\\s")}
>>   .map { (_, 1) }
>>   .keyBy(0)
>>   .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>>   .trigger(CountTrigger.of(5))
>>   .sum(1)
>> 
>> counts.print()
>> sev.execute()
>> 
>> I am sending messages to the port 4444 using nc -lk 4444
>> This is my sample input
>> 
>> a
>> a
>> a
>> a
>> a
>> b
>> b
>> b
>> b
>> b
>> c
>> c
>> c
>> c
>> c
>> d
>> d
>> d
>> d
>> d
>> e
>> e
>> e
>> e
>> e
>> 
>> I am sending 5 of each letter since I have a Count Trigger of 5. I was expecting that for each 5 character, the code will print 5, i.e., (a,5) (b,5) etc. But the output I am getting is little confusing.
>> Output:
>> 
>> 1> (a,5)
>> 1> (a,5)
>> 1> (b,5)
>> 2> (c,5)
>> 2> (c,5)
>> 1> (d,5)
>> 1> (e,5)
>> 1> (e,5)
>> 
>> As you can see, for some character the count is printed twice(a,c,e) and for some characters it is printed only once (b,d). I am not able to figure out what is going on. I think it may have something to do with the SlidingProcessingTimeWindow but I am not sure.
>> Can someone explain me what is going on?
>> 
>> 
>> Thanks and Regards,
>> Vishnu Viswanath
>> www.vishnuviswanath.com
>> 
> 
> 


Re: Behavior of SlidingProessingTimeWindow with CountTrigger

Posted by Vishnu Viswanath <vi...@gmail.com>.
Hi Aijoscha,

Wow, great illustration.

That was very clear explanation. Yes, I did enter the elements fast for
case b and I was seeing more of case As.
Also, sometimes I have seen a window getting triggered when I enter 1 or 2
elements, I believe that is expansion of case A, w.r.t to window 2.

Also can you explain me the case when using Evictor.
e.g.,


val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .evictor(CountEvictor.of(3))
  .sum(1).setParallelism(4);

counts.print()
sev.execute()

for the input


a

a

a

a

a

b

b

b

b

b

I got the output as


1> (a,3)

1> (b,3)

2> (b,3)

My assumption was that, when the Trigger is triggered, the processing will
be done on the entire items in the window,

and then 3 items will be evicted from the window, which can also be part of
the next processing of that window. But

here it looks like  the sum is calculated only on the items that were
evicted from the window.

Could you please explain what is going on here.


Thanks and Regards,
Vishnu Viswanath,
*www.vishnuviswanath.com <http://www.vishnuviswanath.com/>*
​

On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> I created a visualization to help explain the situation:
> http://s21.postimg.org/dofhcw52f/window_example.png
> The SlidingProcessingTimeWindows assigner assigns elements to windows
> based on the current processing time. The CountTrigger only fires if a
> window contains 5 elements (or more). In your test the windows for a, c and
> e fell into case b because you probably entered the letters very fast. For
> elements  b and d we have case a The elements were far enough apart or you
> happened to enter them right on a window boundary such that only one window
> contains all of them. The other windows don’t contain enough elements to
> reach 5. In my drawing window 1 contains 5 elements while window 2 only
> contains 3 of those elements.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On 12 Mar 2016, at 19:19, Vishnu Viswanath <vi...@gmail.com>
> wrote:
>
> Hi All,
>
>
> I have the below code
>
>
> val sev = StreamExecutionEnvironment.getExecutionEnvironment
> val socTextStream = sev.socketTextStream("localhost",4444)
>
> val counts = socTextStream.flatMap{_.split("\\s")}
>   .map { (_, 1) }
>   .keyBy(0)
>
> .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>   .trigger(CountTrigger.of(5))
>   .sum(1)
>
> counts.print()
> sev.execute()
>
> I am sending messages to the port 4444 using nc -lk 4444
> This is my sample input
>
> a
> a
> a
> a
> a
> b
> b
> b
> b
> b
> c
> c
> c
> c
> c
> d
> d
> d
> d
> d
> e
> e
> e
> e
> e
>
> I am sending 5 of each letter since I have a Count Trigger of 5. I was
> expecting that for each 5 character, the code will print 5, i.e., (a,5)
> (b,5) etc. But the output I am getting is little confusing.
> Output:
>
> 1> (a,5)
> 1> (a,5)
> 1> (b,5)
> 2> (c,5)
> 2> (c,5)
> 1> (d,5)
> 1> (e,5)
> 1> (e,5)
>
> As you can see, for some character the count is printed twice(a,c,e) and
> for some characters it is printed only once (b,d). I am not able to figure
> out what is going on. I think it may have something to do with the
> SlidingProcessingTimeWindow but I am not sure.
> Can someone explain me what is going on?
>
>
> Thanks and Regards,
> Vishnu Viswanath
> www.vishnuviswanath.com
>
>
>

Re: Behavior of SlidingProessingTimeWindow with CountTrigger

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I created a visualization to help explain the situation: http://s21.postimg.org/dofhcw52f/window_example.png

The SlidingProcessingTimeWindows assigner assigns elements to windows based on the current processing time. The CountTrigger only fires if a window contains 5 elements (or more). In your test the windows for a, c and e fell into case b because you probably entered the letters very fast. For elements  b and d we have case a The elements were far enough apart or you happened to enter them right on a window boundary such that only one window contains all of them. The other windows don’t contain enough elements to reach 5. In my drawing window 1 contains 5 elements while window 2 only contains 3 of those elements.

I hope this helps.

Cheers,
Aljoscha
> On 12 Mar 2016, at 19:19, Vishnu Viswanath <vi...@gmail.com> wrote:
> 
> Hi All,
> 
> 
> I have the below code
> 
> 
> val sev = StreamExecutionEnvironment.getExecutionEnvironment
> val socTextStream = sev.socketTextStream("localhost",4444)
> 
> val counts = socTextStream.flatMap{_.split("\\s")}
>   .map { (_, 1) }
>   .keyBy(0)
>   .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>   .trigger(CountTrigger.of(5))
>   .sum(1)
> 
> counts.print()
> sev.execute()
> 
> I am sending messages to the port 4444 using nc -lk 4444
> This is my sample input
> 
> a
> a
> a
> a
> a
> b
> b
> b
> b
> b
> c
> c
> c
> c
> c
> d
> d
> d
> d
> d
> e
> e
> e
> e
> e
> 
> I am sending 5 of each letter since I have a Count Trigger of 5. I was expecting that for each 5 character, the code will print 5, i.e., (a,5) (b,5) etc. But the output I am getting is little confusing.
> Output:
> 
> 1> (a,5)
> 1> (a,5)
> 1> (b,5)
> 2> (c,5)
> 2> (c,5)
> 1> (d,5)
> 1> (e,5)
> 1> (e,5)
> 
> As you can see, for some character the count is printed twice(a,c,e) and for some characters it is printed only once (b,d). I am not able to figure out what is going on. I think it may have something to do with the SlidingProcessingTimeWindow but I am not sure.
> Can someone explain me what is going on?
> 
> 
> Thanks and Regards,
> Vishnu Viswanath
> www.vishnuviswanath.com
>