You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Caizhi Weng <ts...@gmail.com> on 2021/09/01 02:58:02 UTC

Re: Clarifying Documentation on Custom Triggers

Hi!

I don't quite understand this problem. But if you look into
WindowedStream#trigger you'll find that the trigger
of WindowOperatorBuilder will change when you call that method, and thus
the default trigger will be overwritten by calling WindowedStream#trigger.

Aeden Jameson <ae...@gmail.com> 于2021年9月1日周三 上午12:32写道:

> Flink Version: 1.13.2
>
> In the section on Default Triggers of Window Assigners the documentation
> States
>
> By specifying a trigger using trigger() you are overwriting the default
> trigger of a WindowAssigner. For example, if you specify a CountTrigger
>  for TumblingEventTimeWindows you will no longer get window firings based
> on the progress of time but only by count.
>
> While I find the example given to be true because the CountTrigger doesn't
> FIRE in OnEventTime the timers of the default trigger are still being
> registered. Is that accurate? So when writing a custom trigger against the
> default trigger of a window one still needs to handle those timers and
> decide to fire them or not. It may be just me, but the documentation gives
> the impression that nothing would happen if one specifies a custom trigger
> with .trigger(...).  I'm I understanding what's going on in the code around
> timers, default and custom triggers correctly? If so it seems one solution
> to this would be to inherit from said WindowAssign and override the
> getDefaultTrigger method.
>
>
> --
> Thanks,
> Aeden
>
>
>

Re: Clarifying Documentation on Custom Triggers

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

If you look into the code of WindowOperator you'll see a cleanup timer is
registered for each element. This cleanup timer is used to deal with late
records. I suppose that is the timer which calls the onEventTime in your
trigger.

Trigger is a class for the user to decide whether to fire a window given
the current element or row time. Deciding which window an element belongs
to is the work of the window assigner, not the trigger. By using the
CountTrigger on a TumbleWindow, the window of the current element will be
fired when the counting exceeds the given threshold. For example if a
window contains 6 elements and the counting threshold is 5, the same window
will be fired 2 times. However if the window contains only 4 elements this
window will be lost, unless the CountTrigger responds to the cleanup timer.

Aeden Jameson <ae...@gmail.com> 于2021年9月2日周四 上午12:42写道:

> Hi Caizhi,
>
>    Thanks for responding. What i mean specifically is that if I do
> something like this
>
>         env
>             .addSource(events)
>             .keyBy(....)
>             .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>             .trigger(new EmptyTrigger())
>             .process(new MyProcessFunction())
>             .addSink(...)
>
> Where EmptyTrigger is defined as,
>
> public class EmptyTrigger extends Trigger<MyElement, TimeWindow> {
>     @Override
>     public TriggerResult onElement(MyElement element, long timestamp,
> TimeWindow window, TriggerContext ctx) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
>
>     @Override
>     public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>         System.out.println("Processing Time: " + time);
>         return TriggerResult.FIRE;
>     }
>
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>         System.out.println("Event Time: " + time);
>         return TriggerResult.FIRE;
>     }
>
>     @Override
>     public void clear(TimeWindow window, TriggerContext ctx) throws
> Exception {
>
>     }
> }
>
> I will see the onEventTime method above fire every second. However I have
> not registered any timers in the above EmptyTrigger. My question becomes
> where are those timers coming from? The documentation states,
>
> "By specifying a trigger using trigger() you are overwriting the default
> trigger of a WindowAssigner."
>
>
> This is the puzzling part to me about the above statement.
>
> Thanks,
> Aeden
>
>
>
>
>
>
> On Tue, Aug 31, 2021 at 7:58 PM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> I don't quite understand this problem. But if you look into
>> WindowedStream#trigger you'll find that the trigger
>> of WindowOperatorBuilder will change when you call that method, and thus
>> the default trigger will be overwritten by calling WindowedStream#trigger.
>>
>> Aeden Jameson <ae...@gmail.com> 于2021年9月1日周三 上午12:32写道:
>>
>>> Flink Version: 1.13.2
>>>
>>> In the section on Default Triggers of Window Assigners the documentation
>>> States
>>>
>>> By specifying a trigger using trigger() you are overwriting the default
>>> trigger of a WindowAssigner. For example, if you specify a CountTrigger
>>>  for TumblingEventTimeWindows you will no longer get window firings
>>> based on the progress of time but only by count.
>>>
>>> While I find the example given to be true because the CountTrigger
>>> doesn't FIRE in OnEventTime the timers of the default trigger are still
>>> being registered. Is that accurate? So when writing a custom trigger
>>> against the default trigger of a window one still needs to handle those
>>> timers and decide to fire them or not. It may be just me, but the
>>> documentation gives the impression that nothing would happen if one
>>> specifies a custom trigger with .trigger(...).  I'm I understanding what's
>>> going on in the code around timers, default and custom triggers correctly?
>>> If so it seems one solution to this would be to inherit from said
>>> WindowAssign and override the getDefaultTrigger method.
>>>
>>>
>>> --
>>> Thanks,
>>> Aeden
>>>
>>>
>>>
>

Re: Clarifying Documentation on Custom Triggers

Posted by Aeden Jameson <ae...@gmail.com>.
Hi Caizhi,

   Thanks for responding. What i mean specifically is that if I do
something like this

        env
            .addSource(events)
            .keyBy(....)
            .window(TumblingEventTimeWindows.of(Time.seconds(1)))
            .trigger(new EmptyTrigger())
            .process(new MyProcessFunction())
            .addSink(...)

Where EmptyTrigger is defined as,

public class EmptyTrigger extends Trigger<MyElement, TimeWindow> {
    @Override
    public TriggerResult onElement(MyElement element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
        System.out.println("Processing Time: " + time);
        return TriggerResult.FIRE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
        System.out.println("Event Time: " + time);
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws
Exception {

    }
}

I will see the onEventTime method above fire every second. However I have
not registered any timers in the above EmptyTrigger. My question becomes
where are those timers coming from? The documentation states,

"By specifying a trigger using trigger() you are overwriting the default
trigger of a WindowAssigner."


This is the puzzling part to me about the above statement.

Thanks,
Aeden






On Tue, Aug 31, 2021 at 7:58 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> I don't quite understand this problem. But if you look into
> WindowedStream#trigger you'll find that the trigger
> of WindowOperatorBuilder will change when you call that method, and thus
> the default trigger will be overwritten by calling WindowedStream#trigger.
>
> Aeden Jameson <ae...@gmail.com> 于2021年9月1日周三 上午12:32写道:
>
>> Flink Version: 1.13.2
>>
>> In the section on Default Triggers of Window Assigners the documentation
>> States
>>
>> By specifying a trigger using trigger() you are overwriting the default
>> trigger of a WindowAssigner. For example, if you specify a CountTrigger
>>  for TumblingEventTimeWindows you will no longer get window firings
>> based on the progress of time but only by count.
>>
>> While I find the example given to be true because the CountTrigger
>> doesn't FIRE in OnEventTime the timers of the default trigger are still
>> being registered. Is that accurate? So when writing a custom trigger
>> against the default trigger of a window one still needs to handle those
>> timers and decide to fire them or not. It may be just me, but the
>> documentation gives the impression that nothing would happen if one
>> specifies a custom trigger with .trigger(...).  I'm I understanding what's
>> going on in the code around timers, default and custom triggers correctly?
>> If so it seems one solution to this would be to inherit from said
>> WindowAssign and override the getDefaultTrigger method.
>>
>>
>> --
>> Thanks,
>> Aeden
>>
>>
>>