You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Scott Kidder <ki...@gmail.com> on 2018/10/19 17:14:22 UTC

Trigger Firing for Late Window Elements

I'm using event-time windows of 1 hour that have an allowed lateness of
several hours. This supports the processing of access logs that can be
delayed by several hours. The windows aggregate data over the 1 hour period
and write to a database sink. Pretty straightforward.

Will the event-time trigger lead to the window trigger firing for every
single late element? Suppose thousands of late elements arrive
simultaneously; I'd like to avoid having that lead to thousands of database
updates in a short period of time. Ideally, I could batch up the late
window changes and have it trigger when the window is finally closed or
some processing-time duration passes (e.g. once per minute).

For reference, here's what the aggregate window definition looks like with
Flink 1.5.3:

        chunkSource.keyBy(record -> record.getRecord().getEnvironmentId())
                .timeWindow(Time.hours(1))
                .allowedLateness(Time.hours(3))
                .aggregate(new EnvironmentAggregateWatchTimeFunction())
                .uid("env-watchtime-stats")
                .name("Env Watch-Time Stats")
                .addSink(new EnvironmentWatchTimeDBSink());


Thank you,

--
Scott Kidder

Re: Trigger Firing for Late Window Elements

Posted by Scott Kidder <ki...@gmail.com>.
That makes sense, thank you, Hequn. I can see the tradeoff between using
allowedLateness on a window to trigger multiple firings, versus a window
with a watermark lagging some amount of time (e.g. 3 hours) that has only a
single firing.

Thanks again,

--
Scott Kidder

On Fri, Oct 19, 2018 at 7:51 PM Hequn Cheng <ch...@gmail.com> wrote:

> Hi Scott,
>
> Yes, the window trigger firing for every single late element.
>
> If you only want the window to be triggered once, you can:
>     - Remove the allowedLateness()
>     - Use BoundedOutOfOrdernessTimestampExtractor to emit Watermarks that
> lag behind the element.
>
> The code(scala) looks like:
>
>> class TimestampExtractor[T1, T2]
>>   extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](
>> Time.hours(3))  {
>>   override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
>>     element._3.getTime
>>   }
>> }
>
>
> Pay attention to that this will increase the latency since only trigger
> firing for the last element.
>
> Best, Hequn
>
> On Sat, Oct 20, 2018 at 1:15 AM Scott Kidder <ki...@gmail.com>
> wrote:
>
>> I'm using event-time windows of 1 hour that have an allowed lateness of
>> several hours. This supports the processing of access logs that can be
>> delayed by several hours. The windows aggregate data over the 1 hour period
>> and write to a database sink. Pretty straightforward.
>>
>> Will the event-time trigger lead to the window trigger firing for every
>> single late element? Suppose thousands of late elements arrive
>> simultaneously; I'd like to avoid having that lead to thousands of database
>> updates in a short period of time. Ideally, I could batch up the late
>> window changes and have it trigger when the window is finally closed or
>> some processing-time duration passes (e.g. once per minute).
>>
>> For reference, here's what the aggregate window definition looks like
>> with Flink 1.5.3:
>>
>>         chunkSource.keyBy(record -> record.getRecord().getEnvironmentId())
>>                 .timeWindow(Time.hours(1))
>>                 .allowedLateness(Time.hours(3))
>>                 .aggregate(new EnvironmentAggregateWatchTimeFunction())
>>                 .uid("env-watchtime-stats")
>>                 .name("Env Watch-Time Stats")
>>                 .addSink(new EnvironmentWatchTimeDBSink());
>>
>>
>> Thank you,
>>
>> --
>> Scott Kidder
>>
>

Re: Trigger Firing for Late Window Elements

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Scott,

Yes, the window trigger firing for every single late element.

If you only want the window to be triggered once, you can:
    - Remove the allowedLateness()
    - Use BoundedOutOfOrdernessTimestampExtractor to emit Watermarks that
lag behind the element.

The code(scala) looks like:

> class TimestampExtractor[T1, T2]
>   extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](
> Time.hours(3))  {
>   override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
>     element._3.getTime
>   }
> }


Pay attention to that this will increase the latency since only trigger
firing for the last element.

Best, Hequn

On Sat, Oct 20, 2018 at 1:15 AM Scott Kidder <ki...@gmail.com> wrote:

> I'm using event-time windows of 1 hour that have an allowed lateness of
> several hours. This supports the processing of access logs that can be
> delayed by several hours. The windows aggregate data over the 1 hour period
> and write to a database sink. Pretty straightforward.
>
> Will the event-time trigger lead to the window trigger firing for every
> single late element? Suppose thousands of late elements arrive
> simultaneously; I'd like to avoid having that lead to thousands of database
> updates in a short period of time. Ideally, I could batch up the late
> window changes and have it trigger when the window is finally closed or
> some processing-time duration passes (e.g. once per minute).
>
> For reference, here's what the aggregate window definition looks like with
> Flink 1.5.3:
>
>         chunkSource.keyBy(record -> record.getRecord().getEnvironmentId())
>                 .timeWindow(Time.hours(1))
>                 .allowedLateness(Time.hours(3))
>                 .aggregate(new EnvironmentAggregateWatchTimeFunction())
>                 .uid("env-watchtime-stats")
>                 .name("Env Watch-Time Stats")
>                 .addSink(new EnvironmentWatchTimeDBSink());
>
>
> Thank you,
>
> --
> Scott Kidder
>