You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aeden Jameson <ae...@gmail.com> on 2021/08/31 16:32:00 UTC

Clarifying Documentation on Custom Triggers

Flink Version: 1.13.2

In the section on Default Triggers of Window Assigners the documentation
States

By specifying a trigger using trigger() you are overwriting the default
trigger of a WindowAssigner. For example, if you specify a CountTrigger for
TumblingEventTimeWindows you will no longer get window firings based on the
progress of time but only by count.

While I find the example given to be true because the CountTrigger doesn't
FIRE in OnEventTime the timers of the default trigger are still being
registered. Is that accurate? So when writing a custom trigger against the
default trigger of a window one still needs to handle those timers and
decide to fire them or not. It may be just me, but the documentation gives
the impression that nothing would happen if one specifies a custom trigger
with .trigger(...).  I'm I understanding what's going on in the code around
timers, default and custom triggers correctly? If so it seems one solution
to this would be to inherit from said WindowAssign and override the
getDefaultTrigger method.


-- 
Thanks,
Aeden

Re: Clarifying Documentation on Custom Triggers

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

If you look into the code of WindowOperator you'll see a cleanup timer is
registered for each element. This cleanup timer is used to deal with late
records. I suppose that is the timer which calls the onEventTime in your
trigger.

Trigger is a class for the user to decide whether to fire a window given
the current element or row time. Deciding which window an element belongs
to is the work of the window assigner, not the trigger. By using the
CountTrigger on a TumbleWindow, the window of the current element will be
fired when the counting exceeds the given threshold. For example if a
window contains 6 elements and the counting threshold is 5, the same window
will be fired 2 times. However if the window contains only 4 elements this
window will be lost, unless the CountTrigger responds to the cleanup timer.

Aeden Jameson <ae...@gmail.com> 于2021年9月2日周四 上午12:42写道:

> Hi Caizhi,
>
>    Thanks for responding. What i mean specifically is that if I do
> something like this
>
>         env
>             .addSource(events)
>             .keyBy(....)
>             .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>             .trigger(new EmptyTrigger())
>             .process(new MyProcessFunction())
>             .addSink(...)
>
> Where EmptyTrigger is defined as,
>
> public class EmptyTrigger extends Trigger<MyElement, TimeWindow> {
>     @Override
>     public TriggerResult onElement(MyElement element, long timestamp,
> TimeWindow window, TriggerContext ctx) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
>
>     @Override
>     public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>         System.out.println("Processing Time: " + time);
>         return TriggerResult.FIRE;
>     }
>
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>         System.out.println("Event Time: " + time);
>         return TriggerResult.FIRE;
>     }
>
>     @Override
>     public void clear(TimeWindow window, TriggerContext ctx) throws
> Exception {
>
>     }
> }
>
> I will see the onEventTime method above fire every second. However I have
> not registered any timers in the above EmptyTrigger. My question becomes
> where are those timers coming from? The documentation states,
>
> "By specifying a trigger using trigger() you are overwriting the default
> trigger of a WindowAssigner."
>
>
> This is the puzzling part to me about the above statement.
>
> Thanks,
> Aeden
>
>
>
>
>
>
> On Tue, Aug 31, 2021 at 7:58 PM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> I don't quite understand this problem. But if you look into
>> WindowedStream#trigger you'll find that the trigger
>> of WindowOperatorBuilder will change when you call that method, and thus
>> the default trigger will be overwritten by calling WindowedStream#trigger.
>>
>> Aeden Jameson <ae...@gmail.com> 于2021年9月1日周三 上午12:32写道:
>>
>>> Flink Version: 1.13.2
>>>
>>> In the section on Default Triggers of Window Assigners the documentation
>>> States
>>>
>>> By specifying a trigger using trigger() you are overwriting the default
>>> trigger of a WindowAssigner. For example, if you specify a CountTrigger
>>>  for TumblingEventTimeWindows you will no longer get window firings
>>> based on the progress of time but only by count.
>>>
>>> While I find the example given to be true because the CountTrigger
>>> doesn't FIRE in OnEventTime the timers of the default trigger are still
>>> being registered. Is that accurate? So when writing a custom trigger
>>> against the default trigger of a window one still needs to handle those
>>> timers and decide to fire them or not. It may be just me, but the
>>> documentation gives the impression that nothing would happen if one
>>> specifies a custom trigger with .trigger(...).  I'm I understanding what's
>>> going on in the code around timers, default and custom triggers correctly?
>>> If so it seems one solution to this would be to inherit from said
>>> WindowAssign and override the getDefaultTrigger method.
>>>
>>>
>>> --
>>> Thanks,
>>> Aeden
>>>
>>>
>>>
>

Re: Clarifying Documentation on Custom Triggers

Posted by Aeden Jameson <ae...@gmail.com>.
Hi Caizhi,

   Thanks for responding. What i mean specifically is that if I do
something like this

        env
            .addSource(events)
            .keyBy(....)
            .window(TumblingEventTimeWindows.of(Time.seconds(1)))
            .trigger(new EmptyTrigger())
            .process(new MyProcessFunction())
            .addSink(...)

Where EmptyTrigger is defined as,

public class EmptyTrigger extends Trigger<MyElement, TimeWindow> {
    @Override
    public TriggerResult onElement(MyElement element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
        System.out.println("Processing Time: " + time);
        return TriggerResult.FIRE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
        System.out.println("Event Time: " + time);
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws
Exception {

    }
}

I will see the onEventTime method above fire every second. However I have
not registered any timers in the above EmptyTrigger. My question becomes
where are those timers coming from? The documentation states,

"By specifying a trigger using trigger() you are overwriting the default
trigger of a WindowAssigner."


This is the puzzling part to me about the above statement.

Thanks,
Aeden






On Tue, Aug 31, 2021 at 7:58 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> I don't quite understand this problem. But if you look into
> WindowedStream#trigger you'll find that the trigger
> of WindowOperatorBuilder will change when you call that method, and thus
> the default trigger will be overwritten by calling WindowedStream#trigger.
>
> Aeden Jameson <ae...@gmail.com> 于2021年9月1日周三 上午12:32写道:
>
>> Flink Version: 1.13.2
>>
>> In the section on Default Triggers of Window Assigners the documentation
>> States
>>
>> By specifying a trigger using trigger() you are overwriting the default
>> trigger of a WindowAssigner. For example, if you specify a CountTrigger
>>  for TumblingEventTimeWindows you will no longer get window firings
>> based on the progress of time but only by count.
>>
>> While I find the example given to be true because the CountTrigger
>> doesn't FIRE in OnEventTime the timers of the default trigger are still
>> being registered. Is that accurate? So when writing a custom trigger
>> against the default trigger of a window one still needs to handle those
>> timers and decide to fire them or not. It may be just me, but the
>> documentation gives the impression that nothing would happen if one
>> specifies a custom trigger with .trigger(...).  I'm I understanding what's
>> going on in the code around timers, default and custom triggers correctly?
>> If so it seems one solution to this would be to inherit from said
>> WindowAssign and override the getDefaultTrigger method.
>>
>>
>> --
>> Thanks,
>> Aeden
>>
>>
>>

Re: Clarifying Documentation on Custom Triggers

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

I don't quite understand this problem. But if you look into
WindowedStream#trigger you'll find that the trigger
of WindowOperatorBuilder will change when you call that method, and thus
the default trigger will be overwritten by calling WindowedStream#trigger.

Aeden Jameson <ae...@gmail.com> 于2021年9月1日周三 上午12:32写道:

> Flink Version: 1.13.2
>
> In the section on Default Triggers of Window Assigners the documentation
> States
>
> By specifying a trigger using trigger() you are overwriting the default
> trigger of a WindowAssigner. For example, if you specify a CountTrigger
>  for TumblingEventTimeWindows you will no longer get window firings based
> on the progress of time but only by count.
>
> While I find the example given to be true because the CountTrigger doesn't
> FIRE in OnEventTime the timers of the default trigger are still being
> registered. Is that accurate? So when writing a custom trigger against the
> default trigger of a window one still needs to handle those timers and
> decide to fire them or not. It may be just me, but the documentation gives
> the impression that nothing would happen if one specifies a custom trigger
> with .trigger(...).  I'm I understanding what's going on in the code around
> timers, default and custom triggers correctly? If so it seems one solution
> to this would be to inherit from said WindowAssign and override the
> getDefaultTrigger method.
>
>
> --
> Thanks,
> Aeden
>
>
>