You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Paul Joireman <pa...@physiq.com> on 2016/09/01 18:16:17 UTC

Windows and Watermarks Clarification

Hi all,


Just a point of clarification on how watermarks are generated.   I'd like to use a SlidingEventTime window of say 5 minutes with a 30 second slide.  The incoming data stream has elements from which I can extract the timestamp but they may come out of order so I chose to implement the following timestamp assigner.


     my_stream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10)) {
          @Override
          public long extractTimestamp(final MyElement element) {
              return element.getTimestamp();
          }
  });

With this definition and the code for BoundedOutOfOrdernessTimestampExtractor, my understanding is that for each incoming element a watermark will be generated that is 10 seconds behind the current timestamp.    If any the end time of any of the sliding windows is earlier that an emitted watermark that (or those) windows will fire initiating a processing on the window(s).   Is this correct?

Paul


Re: Windows and Watermarks Clarification

Posted by Aljoscha Krettek <al...@apache.org>.
Just one clarification: even with a specified allowed lateness the window
will still be evaluated once the watermark passes the end of the window.
It's just that with allowed lateness the window contents and state will be
kept around a bit longer to allow eventual late elements to update the
results. What happens when late elements arrive depends on the trigger.
With the default EventTimeTrigger you will get a new firing that processes
the previously available elements along with the new (late-arriving)
element.

Cheers,
Aljoscha

On Thu, 1 Sep 2016 at 21:15 Fabian Hueske <fh...@gmail.com> wrote:

> A 10 minute tumbling window that starts at 12:00 is evaluated after a
> watermark is observed that is > 12:10.
> If the same tumbling window has an allowed lateness of 5 minuted, it is
> evaluated once a watermark > 12:15 is observed. However, only elements with
> timestamps 12:00 <= x < 12:10 are in the window.
> Elements that arrive even after the allowed lateness period are simply
> dropped.
>
> Best, Fabian
>
> 2016-09-01 20:42 GMT+02:00 Paul Joireman <pa...@physiq.com>:
>
>> Thanks Fabian,
>>
>>
>> This is making more sense.  Is allowedLateness(Time.seconds(x)) then
>> evaluated relative to maxEventTime - lastWaterMarkTime.   So if (maxEventTime
>> - lastWaterMarkTime) > x * 1000 then the window is evaluated?
>>
>>
>> Paul
>> ------------------------------
>> *From:* Fabian Hueske <fh...@gmail.com>
>> *Sent:* Thursday, September 1, 2016 1:25:55 PM
>> *To:* user@flink.apache.org
>> *Subject:* Re: Windows and Watermarks Clarification
>>
>> Hi Paul,
>>
>> BoundedOutOfOrdernessTimestampExtractor implements the
>> AssignerWithPeriodicWatermarks interface.
>> This means, Flink will ask the assigner in regular intervals
>> (configurable via
>> StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval()) for the
>> current watermark.
>> The watermark will be 10secs earlier than the highest observed timestamp
>> so far.
>>
>> An event-time window is evaluated when the current watermark is higher /
>> later than the window's end time. With allowedLateness() the window
>> evaluation can be deferred to allow late elements (elements whose timestamp
>> is before the current watermark) to join the window before it is evaluated.
>>
>> Let me know if you have further questions,
>> Fabian
>>
>>
>> 2016-09-01 20:16 GMT+02:00 Paul Joireman <pa...@physiq.com>:
>>
>>> Hi all,
>>>
>>>
>>> Just a point of clarification on how watermarks are generated.   I'd
>>> like to use a SlidingEventTime window of say 5 minutes with a 30 second
>>> slide.  The incoming data stream has elements from which I can extract the
>>> timestamp but they may come out of order so I chose to implement the
>>> following timestamp assigner.
>>>
>>>
>>>      my_stream.assignTimestampsAndWatermarks(
>>>       new
>>> BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10)) {
>>>           @Override
>>>           public long extractTimestamp(final MyElement element) {
>>>               return element.getTimestamp();
>>>           }
>>>   });
>>>
>>> With this definition and the code for
>>> BoundedOutOfOrdernessTimestampExtractor, my understanding is that for each
>>> incoming element a watermark will be generated that is 10 seconds behind
>>> the current timestamp.    If any the end time of any of the sliding windows
>>> is earlier that an emitted watermark that (or those) windows will fire
>>> initiating a processing on the window(s).   Is this correct?
>>>
>>> Paul
>>>
>>>
>>
>

Re: Windows and Watermarks Clarification

Posted by Fabian Hueske <fh...@gmail.com>.
A 10 minute tumbling window that starts at 12:00 is evaluated after a
watermark is observed that is > 12:10.
If the same tumbling window has an allowed lateness of 5 minuted, it is
evaluated once a watermark > 12:15 is observed. However, only elements with
timestamps 12:00 <= x < 12:10 are in the window.
Elements that arrive even after the allowed lateness period are simply
dropped.

Best, Fabian

2016-09-01 20:42 GMT+02:00 Paul Joireman <pa...@physiq.com>:

> Thanks Fabian,
>
>
> This is making more sense.  Is allowedLateness(Time.seconds(x)) then
> evaluated relative to maxEventTime - lastWaterMarkTime.   So if (maxEventTime
> - lastWaterMarkTime) > x * 1000 then the window is evaluated?
>
>
> Paul
> ------------------------------
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Thursday, September 1, 2016 1:25:55 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Windows and Watermarks Clarification
>
> Hi Paul,
>
> BoundedOutOfOrdernessTimestampExtractor implements the
> AssignerWithPeriodicWatermarks interface.
> This means, Flink will ask the assigner in regular intervals (configurable
> via StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval())
> for the current watermark.
> The watermark will be 10secs earlier than the highest observed timestamp
> so far.
>
> An event-time window is evaluated when the current watermark is higher /
> later than the window's end time. With allowedLateness() the window
> evaluation can be deferred to allow late elements (elements whose timestamp
> is before the current watermark) to join the window before it is evaluated.
>
> Let me know if you have further questions,
> Fabian
>
>
> 2016-09-01 20:16 GMT+02:00 Paul Joireman <pa...@physiq.com>:
>
>> Hi all,
>>
>>
>> Just a point of clarification on how watermarks are generated.   I'd like
>> to use a SlidingEventTime window of say 5 minutes with a 30 second slide.
>> The incoming data stream has elements from which I can extract the
>> timestamp but they may come out of order so I chose to implement the
>> following timestamp assigner.
>>
>>
>>      my_stream.assignTimestampsAndWatermarks(
>>       new BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10))
>> {
>>           @Override
>>           public long extractTimestamp(final MyElement element) {
>>               return element.getTimestamp();
>>           }
>>   });
>>
>> With this definition and the code for BoundedOutOfOrdernessTimestampExtractor,
>> my understanding is that for each incoming element a watermark will be
>> generated that is 10 seconds behind the current timestamp.    If any the
>> end time of any of the sliding windows is earlier that an emitted watermark
>> that (or those) windows will fire initiating a processing on the window(s).
>>   Is this correct?
>>
>> Paul
>>
>>
>

Re: Windows and Watermarks Clarification

Posted by Paul Joireman <pa...@physiq.com>.
Thanks Fabian,


This is making more sense.  Is allowedLateness(Time.seconds(x)) then evaluated relative to maxEventTime - lastWaterMarkTime.   So if (maxEventTime - lastWaterMarkTime) > x * 1000 then the window is evaluated?


Paul

________________________________
From: Fabian Hueske <fh...@gmail.com>
Sent: Thursday, September 1, 2016 1:25:55 PM
To: user@flink.apache.org
Subject: Re: Windows and Watermarks Clarification

Hi Paul,

BoundedOutOfOrdernessTimestampExtractor implements the AssignerWithPeriodicWatermarks interface.
This means, Flink will ask the assigner in regular intervals (configurable via StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval()) for the current watermark.
The watermark will be 10secs earlier than the highest observed timestamp so far.

An event-time window is evaluated when the current watermark is higher / later than the window's end time. With allowedLateness() the window evaluation can be deferred to allow late elements (elements whose timestamp is before the current watermark) to join the window before it is evaluated.

Let me know if you have further questions,
Fabian


2016-09-01 20:16 GMT+02:00 Paul Joireman <pa...@physiq.com>>:

Hi all,


Just a point of clarification on how watermarks are generated.   I'd like to use a SlidingEventTime window of say 5 minutes with a 30 second slide.  The incoming data stream has elements from which I can extract the timestamp but they may come out of order so I chose to implement the following timestamp assigner.


     my_stream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10)) {
          @Override
          public long extractTimestamp(final MyElement element) {
              return element.getTimestamp();
          }
  });

With this definition and the code for BoundedOutOfOrdernessTimestampExtractor, my understanding is that for each incoming element a watermark will be generated that is 10 seconds behind the current timestamp.    If any the end time of any of the sliding windows is earlier that an emitted watermark that (or those) windows will fire initiating a processing on the window(s).   Is this correct?

Paul



Re: Windows and Watermarks Clarification

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Paul,

BoundedOutOfOrdernessTimestampExtractor implements the
AssignerWithPeriodicWatermarks interface.
This means, Flink will ask the assigner in regular intervals (configurable
via StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval()) for
the current watermark.
The watermark will be 10secs earlier than the highest observed timestamp so
far.

An event-time window is evaluated when the current watermark is higher /
later than the window's end time. With allowedLateness() the window
evaluation can be deferred to allow late elements (elements whose timestamp
is before the current watermark) to join the window before it is evaluated.

Let me know if you have further questions,
Fabian


2016-09-01 20:16 GMT+02:00 Paul Joireman <pa...@physiq.com>:

> Hi all,
>
>
> Just a point of clarification on how watermarks are generated.   I'd like
> to use a SlidingEventTime window of say 5 minutes with a 30 second slide.
> The incoming data stream has elements from which I can extract the
> timestamp but they may come out of order so I chose to implement the
> following timestamp assigner.
>
>
>      my_stream.assignTimestampsAndWatermarks(
>       new BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10))
> {
>           @Override
>           public long extractTimestamp(final MyElement element) {
>               return element.getTimestamp();
>           }
>   });
>
> With this definition and the code for BoundedOutOfOrdernessTimestampExtractor,
> my understanding is that for each incoming element a watermark will be
> generated that is 10 seconds behind the current timestamp.    If any the
> end time of any of the sliding windows is earlier that an emitted watermark
> that (or those) windows will fire initiating a processing on the window(s).
>   Is this correct?
>
> Paul
>
>