You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yassine MARZOUGUI <y....@mindlytix.com> on 2016/10/17 09:52:20 UTC

BoundedOutOfOrdernessTimestampExtractor and allowedlateness

Hi,

I'm a bit confused about how Flink deals with late elements after the
introduction of allowedlateness to windows. What is the difference between
using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and
allowedlateness(Time.seconds(X))? What if one is used and the other is not?
and what if a different lateness is used in each one? Could you please
clarify it on basis of a simple example? Thank you.

Best,
Yassine

Re: BoundedOutOfOrdernessTimestampExtractor and allowedlateness

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
Hi Fabian,

Thank you very much for the great answer and example, I appreciate it!
It is all clear now.

Best,
Yassine

2016-10-17 16:29 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> I have to extend my answer:
>
> The behavior allowedLateness that I described applies only if the window
> trigger calls FIRE when the window is evaluated (this is the default
> behavior of most triggers).
>
> In case the trigger calls FIRE_AND_PURGE, the state of the window is
> purged when the function is evaluated and late events are processed alone,
> i.e., in my example <12:09, G> would be processed without [A, B, C, D].
> When the allowed lateness is passed, all window state is purged regardless
> of the trigger.
>
> Best, Fabian
>
> 2016-10-17 16:24 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
>
>> Hi Yassine,
>>
>> the difference is the following:
>>
>> 1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp
>> extractor and watermark assigner.
>> A timestamp extractor tells Flink when an event happened, i.e., it
>> extracts a timestamp from the event. A watermark assigner tells Flink what
>> the current logical time is.
>> The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink
>> asks what the current time is, it returns the latest observed timestamp
>> minus the a configurable bound. This is the safety margin for late data.
>>  A record whose timestamp is lower than the last watermark is considered
>> to be late.
>>
>> 2) The allowedLateness parameter of time windows tells Flink how long to
>> keep state around after the window was evaluated.
>> If data arrives after the evaluation and before the allowedLateness has
>> passed, the window function is applied again and an update is sent out.
>>
>> Let's look at an example.
>> Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling
>> window that starts at 12:00 and ends at 12:10:
>>
>> If you have the following data:
>>
>> 12:01, A
>> 12:04, B
>> WM, 12:02 // 12:04 - 2 minutes
>> 12:02, C
>> 12:08, D
>> 12:14, E
>> WM, 12:12
>> 12:16, F
>> WM, 12:14 // 12:16 - 2 minutes
>> 12:09, G
>>
>> == no allowed lateness
>> The window operator forwards the logical time to 12:12 when it receives
>> <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this
>> time and finally purges its state. <12:09, G> is later ignored.
>>
>> == allowed lateness of 3 minutes
>> The window operator evaluates the window when <WM, 12:12> is received,
>> but its state is not purged yet. The state is purged when <WM, 12:14> is
>> received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is
>> again ignored.
>>
>> == allowed lateness of 5 minutes
>> The window operator evaluates the window when <WM, 12:12> is received,
>> but its state is not purged yet. When <12:09, G> is received, the window is
>> again evaluated but this time with [A, B, C, D, G] and an update is sent
>> out. The state is purged when a watermark of >=12:15 is received.
>>
>> So, watermarks tell the Flink what time it is and allowed lateness tells
>> the system when state should be discarded and all later arriving data be
>> ignored.
>> These issue are related but not exactly the same thing. For instance you
>> can counter late data by increasing the bound or the lateness parameter.
>> Increasing the watermark bound will yield higher latencies as windows are
>> evaluated later.
>> Configuring allowedLateness will allow for earlier results, but you have
>> to cope with the updates downstream.
>>
>> Please let me know, if you have questions.
>>
>> Best, Fabian
>>
>>
>>
>>
>>
>>
>>
>>
>> 2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <y....@mindlytix.com>:
>>
>>> Hi,
>>>
>>> I'm a bit confused about how Flink deals with late elements after the
>>> introduction of allowedlateness to windows. What is the difference between
>>> using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and
>>> allowedlateness(Time.seconds(X))? What if one is used and the other is
>>> not? and what if a different lateness is used in each one? Could you please
>>> clarify it on basis of a simple example? Thank you.
>>>
>>> Best,
>>> Yassine
>>>
>>
>>
>

Re: BoundedOutOfOrdernessTimestampExtractor and allowedlateness

Posted by Fabian Hueske <fh...@gmail.com>.
I have to extend my answer:

The behavior allowedLateness that I described applies only if the window
trigger calls FIRE when the window is evaluated (this is the default
behavior of most triggers).

In case the trigger calls FIRE_AND_PURGE, the state of the window is purged
when the function is evaluated and late events are processed alone, i.e.,
in my example <12:09, G> would be processed without [A, B, C, D].
When the allowed lateness is passed, all window state is purged regardless
of the trigger.

Best, Fabian

2016-10-17 16:24 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> Hi Yassine,
>
> the difference is the following:
>
> 1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp
> extractor and watermark assigner.
> A timestamp extractor tells Flink when an event happened, i.e., it
> extracts a timestamp from the event. A watermark assigner tells Flink what
> the current logical time is.
> The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink
> asks what the current time is, it returns the latest observed timestamp
> minus the a configurable bound. This is the safety margin for late data.
>  A record whose timestamp is lower than the last watermark is considered
> to be late.
>
> 2) The allowedLateness parameter of time windows tells Flink how long to
> keep state around after the window was evaluated.
> If data arrives after the evaluation and before the allowedLateness has
> passed, the window function is applied again and an update is sent out.
>
> Let's look at an example.
> Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling
> window that starts at 12:00 and ends at 12:10:
>
> If you have the following data:
>
> 12:01, A
> 12:04, B
> WM, 12:02 // 12:04 - 2 minutes
> 12:02, C
> 12:08, D
> 12:14, E
> WM, 12:12
> 12:16, F
> WM, 12:14 // 12:16 - 2 minutes
> 12:09, G
>
> == no allowed lateness
> The window operator forwards the logical time to 12:12 when it receives
> <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this
> time and finally purges its state. <12:09, G> is later ignored.
>
> == allowed lateness of 3 minutes
> The window operator evaluates the window when <WM, 12:12> is received, but
> its state is not purged yet. The state is purged when <WM, 12:14> is
> received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is
> again ignored.
>
> == allowed lateness of 5 minutes
> The window operator evaluates the window when <WM, 12:12> is received, but
> its state is not purged yet. When <12:09, G> is received, the window is
> again evaluated but this time with [A, B, C, D, G] and an update is sent
> out. The state is purged when a watermark of >=12:15 is received.
>
> So, watermarks tell the Flink what time it is and allowed lateness tells
> the system when state should be discarded and all later arriving data be
> ignored.
> These issue are related but not exactly the same thing. For instance you
> can counter late data by increasing the bound or the lateness parameter.
> Increasing the watermark bound will yield higher latencies as windows are
> evaluated later.
> Configuring allowedLateness will allow for earlier results, but you have
> to cope with the updates downstream.
>
> Please let me know, if you have questions.
>
> Best, Fabian
>
>
>
>
>
>
>
>
> 2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <y....@mindlytix.com>:
>
>> Hi,
>>
>> I'm a bit confused about how Flink deals with late elements after the
>> introduction of allowedlateness to windows. What is the difference between
>> using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and
>> allowedlateness(Time.seconds(X))? What if one is used and the other is
>> not? and what if a different lateness is used in each one? Could you please
>> clarify it on basis of a simple example? Thank you.
>>
>> Best,
>> Yassine
>>
>
>

Re: BoundedOutOfOrdernessTimestampExtractor and allowedlateness

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Yassine,

the difference is the following:

1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp
extractor and watermark assigner.
A timestamp extractor tells Flink when an event happened, i.e., it extracts
a timestamp from the event. A watermark assigner tells Flink what the
current logical time is.
The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink
asks what the current time is, it returns the latest observed timestamp
minus the a configurable bound. This is the safety margin for late data.
 A record whose timestamp is lower than the last watermark is considered to
be late.

2) The allowedLateness parameter of time windows tells Flink how long to
keep state around after the window was evaluated.
If data arrives after the evaluation and before the allowedLateness has
passed, the window function is applied again and an update is sent out.

Let's look at an example.
Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling
window that starts at 12:00 and ends at 12:10:

If you have the following data:

12:01, A
12:04, B
WM, 12:02 // 12:04 - 2 minutes
12:02, C
12:08, D
12:14, E
WM, 12:12
12:16, F
WM, 12:14 // 12:16 - 2 minutes
12:09, G

== no allowed lateness
The window operator forwards the logical time to 12:12 when it receives
<WM, 12:12> and evaluates the window which contains [A, B, C, D] at this
time and finally purges its state. <12:09, G> is later ignored.

== allowed lateness of 3 minutes
The window operator evaluates the window when <WM, 12:12> is received, but
its state is not purged yet. The state is purged when <WM, 12:14> is
received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is
again ignored.

== allowed lateness of 5 minutes
The window operator evaluates the window when <WM, 12:12> is received, but
its state is not purged yet. When <12:09, G> is received, the window is
again evaluated but this time with [A, B, C, D, G] and an update is sent
out. The state is purged when a watermark of >=12:15 is received.

So, watermarks tell the Flink what time it is and allowed lateness tells
the system when state should be discarded and all later arriving data be
ignored.
These issue are related but not exactly the same thing. For instance you
can counter late data by increasing the bound or the lateness parameter.
Increasing the watermark bound will yield higher latencies as windows are
evaluated later.
Configuring allowedLateness will allow for earlier results, but you have to
cope with the updates downstream.

Please let me know, if you have questions.

Best, Fabian








2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <y....@mindlytix.com>:

> Hi,
>
> I'm a bit confused about how Flink deals with late elements after the
> introduction of allowedlateness to windows. What is the difference between
> using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and
> allowedlateness(Time.seconds(X))? What if one is used and the other is
> not? and what if a different lateness is used in each one? Could you please
> clarify it on basis of a simple example? Thank you.
>
> Best,
> Yassine
>