You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Igor Berman <ig...@gmail.com> on 2016/06/03 12:55:37 UTC

Event processing time with lateness

Hi

according to presentation of Tyler Akidau
https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present
Flink
supports late arrivals for window processing, while I've seen several
question in the userlist regarding late arrivals and answer was - sort of
"not for all usecases"
Can somebody clarify?

The interesting case for me - I have event processing time, while I want to
aggregate by tumbling window. The events come from kafka and might be late.
Currently we define lateness threshold with watermark (e.g. 5 mins)

After window triggers I want to save aggregated result at some persistent
storage(redis/hbase) with start timestamp of window

After this grace period - if I understand correctly - any event won't be
aggregated into existing window, but rather the trigger will call
aggregated function with only 1 element inside(the late one)

so if my window method saves into persistent storage - it will override
aggregated result with new one that has only 1 element inside

what I want to achieve - is that late arrival will trigger window method
with all elements (late + all other) so that aggregated result will be
complete

you can think about use case of page visits counts per minute, while due to
some problems page visit events might arrive late

thanks in advance

Re: Event processing time with lateness

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Igor,
you might be interested in this doc about how we want to improve handling
of late data and some other things in the windowing API:
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing

I've sent it around several times but you can never know who's aware of it
already. :-)

Cheers,
Aljoscha

On Fri, 3 Jun 2016 at 22:02 Michael Tamillow <mi...@gmail.com>
wrote:

> Super cool stuff
>
> On Fri, Jun 3, 2016 at 10:55 AM, Kostas Kloudas <
> k.kloudas@data-artisans.com> wrote:
>
>> You are welcome!
>>
>>
>> On Jun 3, 2016, at 4:40 PM, Igor Berman <ig...@gmail.com> wrote:
>>
>> thanks Kosta
>>
>> On 3 June 2016 at 16:47, Kostas Kloudas <k....@data-artisans.com>
>> wrote:
>>
>>> Hi Igor,
>>>
>>> To handle late events in Flink you would have to implement you own
>>> custom trigger.
>>>
>>> To see a relatively more complex example of such a trigger and how to
>>> implement it,
>>> you can have a look at this implementation:
>>> https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>>>
>>> Which implements the trigger described in this article (before the
>>> conclusions section)
>>> http://data-artisans.com/why-apache-beam/
>>>
>>> Thanks,
>>> Kostas
>>>
>>> On Jun 3, 2016, at 2:55 PM, Igor Berman <ig...@gmail.com> wrote:
>>>
>>> Hi
>>>
>>> according to presentation of Tyler Akidau
>>> https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink
>>> supports late arrivals for window processing, while I've seen several
>>> question in the userlist regarding late arrivals and answer was - sort of
>>> "not for all usecases"
>>> Can somebody clarify?
>>>
>>> The interesting case for me - I have event processing time, while I want
>>> to aggregate by tumbling window. The events come from kafka and might be
>>> late. Currently we define lateness threshold with watermark (e.g. 5 mins)
>>>
>>> After window triggers I want to save aggregated result at some
>>> persistent storage(redis/hbase) with start timestamp of window
>>>
>>> After this grace period - if I understand correctly - any event won't be
>>> aggregated into existing window, but rather the trigger will call
>>> aggregated function with only 1 element inside(the late one)
>>>
>>> so if my window method saves into persistent storage - it will override
>>> aggregated result with new one that has only 1 element inside
>>>
>>> what I want to achieve - is that late arrival will trigger window method
>>> with all elements (late + all other) so that aggregated result will be
>>> complete
>>>
>>> you can think about use case of page visits counts per minute, while due
>>> to some problems page visit events might arrive late
>>>
>>> thanks in advance
>>>
>>>
>>>
>>>
>>
>>
>

Re: Event processing time with lateness

Posted by Michael Tamillow <mi...@gmail.com>.
Super cool stuff

On Fri, Jun 3, 2016 at 10:55 AM, Kostas Kloudas <k.kloudas@data-artisans.com
> wrote:

> You are welcome!
>
>
> On Jun 3, 2016, at 4:40 PM, Igor Berman <ig...@gmail.com> wrote:
>
> thanks Kosta
>
> On 3 June 2016 at 16:47, Kostas Kloudas <k....@data-artisans.com>
> wrote:
>
>> Hi Igor,
>>
>> To handle late events in Flink you would have to implement you own custom
>> trigger.
>>
>> To see a relatively more complex example of such a trigger and how to
>> implement it,
>> you can have a look at this implementation:
>> https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>>
>> Which implements the trigger described in this article (before the
>> conclusions section)
>> http://data-artisans.com/why-apache-beam/
>>
>> Thanks,
>> Kostas
>>
>> On Jun 3, 2016, at 2:55 PM, Igor Berman <ig...@gmail.com> wrote:
>>
>> Hi
>>
>> according to presentation of Tyler Akidau
>> https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink
>> supports late arrivals for window processing, while I've seen several
>> question in the userlist regarding late arrivals and answer was - sort of
>> "not for all usecases"
>> Can somebody clarify?
>>
>> The interesting case for me - I have event processing time, while I want
>> to aggregate by tumbling window. The events come from kafka and might be
>> late. Currently we define lateness threshold with watermark (e.g. 5 mins)
>>
>> After window triggers I want to save aggregated result at some persistent
>> storage(redis/hbase) with start timestamp of window
>>
>> After this grace period - if I understand correctly - any event won't be
>> aggregated into existing window, but rather the trigger will call
>> aggregated function with only 1 element inside(the late one)
>>
>> so if my window method saves into persistent storage - it will override
>> aggregated result with new one that has only 1 element inside
>>
>> what I want to achieve - is that late arrival will trigger window method
>> with all elements (late + all other) so that aggregated result will be
>> complete
>>
>> you can think about use case of page visits counts per minute, while due
>> to some problems page visit events might arrive late
>>
>> thanks in advance
>>
>>
>>
>>
>
>

Re: Event processing time with lateness

Posted by Kostas Kloudas <k....@data-artisans.com>.
You are welcome!

> On Jun 3, 2016, at 4:40 PM, Igor Berman <ig...@gmail.com> wrote:
> 
> thanks Kosta
> 
> On 3 June 2016 at 16:47, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi Igor,
> 
> To handle late events in Flink you would have to implement you own custom trigger.
> 
> To see a relatively more complex example of such a trigger and how to implement it,
> you can have a look at this implementation: https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java <https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java>
> 
> Which implements the trigger described in this article (before the conclusions section)
> http://data-artisans.com/why-apache-beam/ <http://data-artisans.com/why-apache-beam/>
> 
> Thanks,
> Kostas
> 
>> On Jun 3, 2016, at 2:55 PM, Igor Berman <igor.berman@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi 
>> 
>> according to presentation of Tyler Akidau https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present <https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present> Flink supports late arrivals for window processing, while I've seen several question in the userlist regarding late arrivals and answer was - sort of "not for all usecases"
>> Can somebody clarify?
>> 
>> The interesting case for me - I have event processing time, while I want to aggregate by tumbling window. The events come from kafka and might be late. Currently we define lateness threshold with watermark (e.g. 5 mins)
>> 
>> After window triggers I want to save aggregated result at some persistent storage(redis/hbase) with start timestamp of window
>> 
>> After this grace period - if I understand correctly - any event won't be aggregated into existing window, but rather the trigger will call aggregated function with only 1 element inside(the late one)
>> 
>> so if my window method saves into persistent storage - it will override aggregated result with new one that has only 1 element inside
>> 
>> what I want to achieve - is that late arrival will trigger window method with all elements (late + all other) so that aggregated result will be complete
>> 
>> you can think about use case of page visits counts per minute, while due to some problems page visit events might arrive late
>> 
>> thanks in advance
>> 
>> 
>> 
> 
> 


Re: Event processing time with lateness

Posted by Igor Berman <ig...@gmail.com>.
thanks Kosta

On 3 June 2016 at 16:47, Kostas Kloudas <k....@data-artisans.com> wrote:

> Hi Igor,
>
> To handle late events in Flink you would have to implement you own custom
> trigger.
>
> To see a relatively more complex example of such a trigger and how to
> implement it,
> you can have a look at this implementation:
> https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>
> Which implements the trigger described in this article (before the
> conclusions section)
> http://data-artisans.com/why-apache-beam/
>
> Thanks,
> Kostas
>
> On Jun 3, 2016, at 2:55 PM, Igor Berman <ig...@gmail.com> wrote:
>
> Hi
>
> according to presentation of Tyler Akidau
> https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink
> supports late arrivals for window processing, while I've seen several
> question in the userlist regarding late arrivals and answer was - sort of
> "not for all usecases"
> Can somebody clarify?
>
> The interesting case for me - I have event processing time, while I want
> to aggregate by tumbling window. The events come from kafka and might be
> late. Currently we define lateness threshold with watermark (e.g. 5 mins)
>
> After window triggers I want to save aggregated result at some persistent
> storage(redis/hbase) with start timestamp of window
>
> After this grace period - if I understand correctly - any event won't be
> aggregated into existing window, but rather the trigger will call
> aggregated function with only 1 element inside(the late one)
>
> so if my window method saves into persistent storage - it will override
> aggregated result with new one that has only 1 element inside
>
> what I want to achieve - is that late arrival will trigger window method
> with all elements (late + all other) so that aggregated result will be
> complete
>
> you can think about use case of page visits counts per minute, while due
> to some problems page visit events might arrive late
>
> thanks in advance
>
>
>
>

Re: Event processing time with lateness

Posted by Elias Levy <fe...@gmail.com>.
On Fri, Jun 3, 2016 at 6:47 AM, Kostas Kloudas <k....@data-artisans.com>
wrote:

> To see a relatively more complex example of such a trigger and how to
> implement it,
> you can have a look at this implementation:
> https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>

I've modified this trigger so that firing are suppressed unless there are
new events between timers.  This can significantly reduce the outputted
events, which could mean much reduced writes to a downstream data store.
See https://gist.github.com/eliaslevy/ec840444607b9a5dd5aa3eb2cdd77932.

Also, I find the accumulating behavior somewhat unintuitive as when
disabled it only purges when the time window ends.  When discarding is in
effect, it seems more natural for purging it to occur at each firing,
whether early, at the windows event time end, or late.  Otherwise, you may
end up with output events of different semantics.  E.g. with the current
behavior if you are implementing a counter early firing will result on
partial counts until the window end, after that late will give you partial
counts of the delta from the window end count.  It would be more consistent
to either generate partial counts at all firing or deltas at all firing, so
that the output of the operator can be processes the same downstream.

Re: Event processing time with lateness

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Igor,

To handle late events in Flink you would have to implement you own custom trigger.

To see a relatively more complex example of such a trigger and how to implement it,
you can have a look at this implementation: https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java <https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java>

Which implements the trigger described in this article (before the conclusions section)
http://data-artisans.com/why-apache-beam/ <http://data-artisans.com/why-apache-beam/>

Thanks,
Kostas

> On Jun 3, 2016, at 2:55 PM, Igor Berman <ig...@gmail.com> wrote:
> 
> Hi 
> 
> according to presentation of Tyler Akidau https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present <https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present> Flink supports late arrivals for window processing, while I've seen several question in the userlist regarding late arrivals and answer was - sort of "not for all usecases"
> Can somebody clarify?
> 
> The interesting case for me - I have event processing time, while I want to aggregate by tumbling window. The events come from kafka and might be late. Currently we define lateness threshold with watermark (e.g. 5 mins)
> 
> After window triggers I want to save aggregated result at some persistent storage(redis/hbase) with start timestamp of window
> 
> After this grace period - if I understand correctly - any event won't be aggregated into existing window, but rather the trigger will call aggregated function with only 1 element inside(the late one)
> 
> so if my window method saves into persistent storage - it will override aggregated result with new one that has only 1 element inside
> 
> what I want to achieve - is that late arrival will trigger window method with all elements (late + all other) so that aggregated result will be complete
> 
> you can think about use case of page visits counts per minute, while due to some problems page visit events might arrive late
> 
> thanks in advance
> 
> 
>