You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Soheil Pourbafrani <so...@gmail.com> on 2018/07/30 09:05:51 UTC

watermark VS window trigger

Suppose we have a time window of 10 milliseconds and we use EventTime.
First, we determine how Flink can get time and watermark from
incoming messages, after that, we set a key for the stream and set a time
window.

aggregatedTuple
                .assignTimestampsAndWatermarks(new SampleTimestampExtractor())
                .keyBy(1).timeWindow(Time.milliseconds(1000))/*.countWindow(3)*/
                .reduce()

My understanding of the data flow in this scenario is the following:

Flink advanced time according to the timestamp of Incoming data into the
aggregatedTuple variable while for each message get the timestamp and
watermark.
As I use Periodic Watermarks, according to default watermark interval
(200ms), watermarks will be updated. After that according to the
current watermark, data with the timestamp between the last watermark and
current watermark will be released and go to the next steps (keyBy,
timeWindow, reduce). If Flink received a data but an appropriate watermark
didn't emit for that data yet, Flink didn't send that data to the next
steps and keep it until it's appropriate watermark will be emitted.

Is that correct?

Re: watermark VS window trigger

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Watermarks are not holding back records. Instead they define the event-time
at an operator (as Vino said) and can trigger the processing of data if the
logic of an operator is based on time.
For example, a window operator can emit complete results for a window once
the time passed the window's end timestamp.
Operators that do not act on time, such as mappers or filters, emit records
at as soon as possible without waiting for watermarks.

Best, Fabian

2018-07-30 11:37 GMT+02:00 vino yang <ya...@gmail.com>:

> Hi Soheil,
>
> I feel that some of your understanding is a bit problematic.
>
> *"After that according to the current watermark, data with the timestamp
> between the last watermark and current watermark will be released and go to
> the next steps"*
>
> The main role of Watermark here is to define the progress of the event
> time, which will serve as the time base for the window to trigger. Before
> the time window, the upstream will only generate a Watermark according to a
> specific cycle, and then raise the Watermark of the downstream task while
> flowing downstream.
>
> You can read "event time & watermark" documentation [1].
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-
> master/dev/event_time.html
>
> Thanks, vino.
>
>
> 2018-07-30 17:05 GMT+08:00 Soheil Pourbafrani <so...@gmail.com>:
>
>> Suppose we have a time window of 10 milliseconds and we use EventTime.
>> First, we determine how Flink can get time and watermark from
>> incoming messages, after that, we set a key for the stream and set a time
>> window.
>>
>> aggregatedTuple
>>                 .assignTimestampsAndWatermarks(new SampleTimestampExtractor())
>>                 .keyBy(1).timeWindow(Time.milliseconds(1000))/*.countWindow(3)*/
>>                 .reduce()
>>
>> My understanding of the data flow in this scenario is the following:
>>
>> Flink advanced time according to the timestamp of Incoming data into the
>> aggregatedTuple variable while for each message get the timestamp and
>> watermark.
>> As I use Periodic Watermarks, according to default watermark interval
>> (200ms), watermarks will be updated. After that according to the
>> current watermark, data with the timestamp between the last watermark and
>> current watermark will be released and go to the next steps (keyBy,
>> timeWindow, reduce). If Flink received a data but an appropriate watermark
>> didn't emit for that data yet, Flink didn't send that data to the next
>> steps and keep it until it's appropriate watermark will be emitted.
>>
>> Is that correct?
>>
>
>

Re: watermark VS window trigger

Posted by vino yang <ya...@gmail.com>.
Hi Soheil,

I feel that some of your understanding is a bit problematic.

*"After that according to the current watermark, data with the timestamp
between the last watermark and current watermark will be released and go to
the next steps"*

The main role of Watermark here is to define the progress of the event
time, which will serve as the time base for the window to trigger. Before
the time window, the upstream will only generate a Watermark according to a
specific cycle, and then raise the Watermark of the downstream task while
flowing downstream.

You can read "event time & watermark" documentation [1].

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html

Thanks, vino.


2018-07-30 17:05 GMT+08:00 Soheil Pourbafrani <so...@gmail.com>:

> Suppose we have a time window of 10 milliseconds and we use EventTime.
> First, we determine how Flink can get time and watermark from
> incoming messages, after that, we set a key for the stream and set a time
> window.
>
> aggregatedTuple
>                 .assignTimestampsAndWatermarks(new SampleTimestampExtractor())
>                 .keyBy(1).timeWindow(Time.milliseconds(1000))/*.countWindow(3)*/
>                 .reduce()
>
> My understanding of the data flow in this scenario is the following:
>
> Flink advanced time according to the timestamp of Incoming data into the
> aggregatedTuple variable while for each message get the timestamp and
> watermark.
> As I use Periodic Watermarks, according to default watermark interval
> (200ms), watermarks will be updated. After that according to the
> current watermark, data with the timestamp between the last watermark and
> current watermark will be released and go to the next steps (keyBy,
> timeWindow, reduce). If Flink received a data but an appropriate watermark
> didn't emit for that data yet, Flink didn't send that data to the next
> steps and keep it until it's appropriate watermark will be emitted.
>
> Is that correct?
>