You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jerry Peng <je...@gmail.com> on 2017/08/23 00:22:48 UTC

Question about windowing

Hello,

I have a question regarding windowing and triggering.  I am trying to
connect the dots between the simple windowing api e.g.

stream.countWindow(1000, 100)

to the underlying representation using triggers and evictors api:

stream.window(GlobalWindows.create())
  .evictor(CountEvictor.of(1000))
  .trigger(CountTrigger.of(100))


how is the above equivalent to the semantics of a window of window
length to be 1000 tuples and the sliding interval to be 100 tuples?

And for time duration windows:

stream.timeWindow(Time.seconds(5), Time.seconds(1))

which maps to:

stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create())

why isn't it mapped to something like:

stream.window(SlidingProcessingTimeWindows.create())
  .trigger(ProccessingTimeTrigger.of(1))
  .evictor(TimeEvictor.of(5))

?

Thanks for any help in advance!

Best,

Jerry

Re: Question about windowing

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, this is a very good explanation, Tony!

I'd like to add that "Evictor" is not really a good name for what it does. It should be more like "Keeper" or "Retainer" because what a "CountEvictor.of(1000)" really does is to evict everything but the last 1000 elements, so it should be called "CountRetainer.of(1000)". 

(The name Trigger and Evictor were originally inspired by IBM Infosphere streams, AFAIK.)

Best,
Aljoscha

> On 23. Aug 2017, at 09:56, 魏偉哲 <to...@gmail.com> wrote:
> 
> Hi Jerry,
> 
> You can learn about Flink's windowing mechanics in this blog (https://flink.apache.org/news/2015/12/04/Introducing-windows.html <https://flink.apache.org/news/2015/12/04/Introducing-windows.html>).
> 
> To my understanging, window() defines how Flink use WindowAssigner to insert an element to the right windows, trigger() defines when to fire a  window and evictor() defines what elements in window should be passed to the evaluation function.
> 
> Therefore, it is obvious for time duration windows to use window() to describe an element should be assigned which windows, and trigger by the processing time.
> For the count window, we should actually count the number of elements, so we insert all elements to the single global window, trigger by every 100 elements and filter only 1000 elements to the next evaluation function.
> 
> One more thing, in the sliding count window, each element will be placed in the one window, but in the sliding time window element would be duplicated and insert into multiple windows.
> Use you case as an example, element would be placed into five different windows each represent different time range.
> 
> Hope this will help you.
> 
> Regards,
> Tony Wei
> 
> 
> 
> 2017-08-23 8:22 GMT+08:00 Jerry Peng <jerry.boyang.peng@gmail.com <ma...@gmail.com>>:
> Hello,
> 
> I have a question regarding windowing and triggering.  I am trying to
> connect the dots between the simple windowing api e.g.
> 
> stream.countWindow(1000, 100)
> 
> to the underlying representation using triggers and evictors api:
> 
> stream.window(GlobalWindows.create())
>   .evictor(CountEvictor.of(1000))
>   .trigger(CountTrigger.of(100))
> 
> 
> how is the above equivalent to the semantics of a window of window
> length to be 1000 tuples and the sliding interval to be 100 tuples?
> 
> And for time duration windows:
> 
> stream.timeWindow(Time.seconds(5), Time.seconds(1))
> 
> which maps to:
> 
> stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
>   .trigger(ProcessingTimeTrigger.create())
> 
> why isn't it mapped to something like:
> 
> stream.window(SlidingProcessingTimeWindows.create())
>   .trigger(ProccessingTimeTrigger.of(1))
>   .evictor(TimeEvictor.of(5))
> 
> ?
> 
> Thanks for any help in advance!
> 
> Best,
> 
> Jerry
> 


Re: Question about windowing

Posted by 魏偉哲 <to...@gmail.com>.
Hi Jerry,

You can learn about Flink's windowing mechanics in this blog (
https://flink.apache.org/news/2015/12/04/Introducing-windows.html).

To my understanging, window() defines how Flink use WindowAssigner to
insert an element to the right windows, trigger() defines when to fire a
 window and evictor() defines what elements in window should be passed to
the evaluation function.

Therefore, it is obvious for time duration windows to use window() to
describe an element should be assigned which windows, and trigger by the
processing time.
For the count window, we should actually count the number of elements, so
we insert all elements to the single global window, trigger by every 100
elements and filter only 1000 elements to the next evaluation function.

One more thing, in the sliding count window, each element will be placed in
the one window, but in the sliding time window element would be duplicated
and insert into multiple windows.
Use you case as an example, element would be placed into five different
windows each represent different time range.

Hope this will help you.

Regards,
Tony Wei



2017-08-23 8:22 GMT+08:00 Jerry Peng <je...@gmail.com>:

> Hello,
>
> I have a question regarding windowing and triggering.  I am trying to
> connect the dots between the simple windowing api e.g.
>
> stream.countWindow(1000, 100)
>
> to the underlying representation using triggers and evictors api:
>
> stream.window(GlobalWindows.create())
>   .evictor(CountEvictor.of(1000))
>   .trigger(CountTrigger.of(100))
>
>
> how is the above equivalent to the semantics of a window of window
> length to be 1000 tuples and the sliding interval to be 100 tuples?
>
> And for time duration windows:
>
> stream.timeWindow(Time.seconds(5), Time.seconds(1))
>
> which maps to:
>
> stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5),
> Time.seconds(1)))
>   .trigger(ProcessingTimeTrigger.create())
>
> why isn't it mapped to something like:
>
> stream.window(SlidingProcessingTimeWindows.create())
>   .trigger(ProccessingTimeTrigger.of(1))
>   .evictor(TimeEvictor.of(5))
>
> ?
>
> Thanks for any help in advance!
>
> Best,
>
> Jerry
>