You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrew Danks <a....@gmail.com> on 2018/10/11 22:23:05 UTC

When does Trigger.clear() get called?

Hello,

I see that the clear() function is implemented for various types of Triggers in the Flink API. For example:
https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87 <https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87>

I am working on a custom Trigger for my application and have implemented clear() in a similar way.

However, having put a breakpoint in this function it doesn’t seem to get called when I expect. The source code says that is called "when a window is purged”[1] but when my Trigger emits a PURGE this function never seems to get called. I am on Flink 1.3.

Hoping someone can shed more light on the purpose of clear() and how/when it gets called

Thanks!
Andrew


[1] https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111 <https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111>


Re: When does Trigger.clear() get called?

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Andrew,

You should call it manually, as the global window does not have a natural
end.

Best, Hequn

On Wed, Oct 17, 2018 at 2:47 AM Andrew Danks <a....@gmail.com> wrote:

> Hi Fabian & Hequn,
>
> Thank you for your responses. I am just responding now as I was out of
> office for the last few days
>
> You mentioned that clear() is called when the time exceeds the window’s
> end timestamp. For my application I am using a GlobalWindow on a keyed
> stream -- would clear() get called at all in this case or should I be
> calling it manually?
>
>
> Andrew
>
> On Oct 12, 2018, at 12:48 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Andrew,
>
> The PURGE action of a window removes the window state (i.e., the collected
> events or computed aggregate) but the window meta data including the
> Trigger remain.
> The Trigger.close() method is called, when the winodw is completely (i.e.,
> all meta data) discarded. This happens, when the time (wallclock time for
> processing time or watermark for event time windows) exceeds the window's
> end timestamp.
>
> Best, Fabian
>
> Am Fr., 12. Okt. 2018 um 05:25 Uhr schrieb Hequn Cheng <
> chenghequn@gmail.com>:
>
>> Hi Andrew,
>>
>> Do you use CountWindow? You can switch to TimeWindow to have a test.
>> I'm not quite familiar with window. I checked the code and found that
>> clear() is called only when timer is triggered, i.e, called at the end of
>> time window.
>> Hope this helps.
>>
>> Best, Hequn
>>
>> On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks <a....@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I see that the clear() function is implemented for various types of
>>> Triggers in the Flink API. For example:
>>>
>>> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87
>>>
>>> I am working on a custom Trigger for my application and have implemented
>>> clear() in a similar way.
>>>
>>> However, having put a breakpoint in this function it doesn’t seem to get
>>> called when I expect. The source code says that is called "when a window is
>>> purged”[1] but when my Trigger emits a PURGE this function never seems to
>>> get called. I am on Flink 1.3.
>>>
>>> Hoping someone can shed more light on the purpose of clear() and
>>> how/when it gets called
>>>
>>> Thanks!
>>> Andrew
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111
>>>
>>>
>

Re: When does Trigger.clear() get called?

Posted by Andrew Danks <a....@gmail.com>.
Hi Fabian & Hequn,

Thank you for your responses. I am just responding now as I was out of office for the last few days

You mentioned that clear() is called when the time exceeds the window’s end timestamp. For my application I am using a GlobalWindow on a keyed stream -- would clear() get called at all in this case or should I be calling it manually?


Andrew

> On Oct 12, 2018, at 12:48 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Andrew,
> 
> The PURGE action of a window removes the window state (i.e., the collected events or computed aggregate) but the window meta data including the Trigger remain.
> The Trigger.close() method is called, when the winodw is completely (i.e., all meta data) discarded. This happens, when the time (wallclock time for processing time or watermark for event time windows) exceeds the window's end timestamp.
> 
> Best, Fabian
> 
> Am Fr., 12. Okt. 2018 um 05:25 Uhr schrieb Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>>:
> Hi Andrew,
> 
> Do you use CountWindow? You can switch to TimeWindow to have a test.
> I'm not quite familiar with window. I checked the code and found that clear() is called only when timer is triggered, i.e, called at the end of time window.
> Hope this helps.
> 
> Best, Hequn
> 
> On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks <a.danks@gmail.com <ma...@gmail.com>> wrote:
> Hello,
> 
> I see that the clear() function is implemented for various types of Triggers in the Flink API. For example:
> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87 <https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87>
> 
> I am working on a custom Trigger for my application and have implemented clear() in a similar way.
> 
> However, having put a breakpoint in this function it doesn’t seem to get called when I expect. The source code says that is called "when a window is purged”[1] but when my Trigger emits a PURGE this function never seems to get called. I am on Flink 1.3.
> 
> Hoping someone can shed more light on the purpose of clear() and how/when it gets called
> 
> Thanks!
> Andrew
> 
> 
> [1] https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111 <https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111>
> 


Re: When does Trigger.clear() get called?

Posted by Averell <lv...@gmail.com>.
Thank you Fabian.

All my doubts are cleared now.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: When does Trigger.clear() get called?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Re Q1: The main purpose of the Trigger.clean() method is to remove all
custom state of the Trigger. State must be explicitly removed, otherwise
the program leaks memory.
Re Q3: If you are using a keyed stream, you need to manually clean up the
state by calling State.clear(). If you are using a ProcessFunction, you can
do that in processElement() or register a timer and clean up in onTimer().

Best, Fabian

Am So., 14. Okt. 2018 um 06:06 Uhr schrieb Averell <lv...@gmail.com>:

> Hello Hequn,
>
> Thanks for the answers.
> Regarding question no.2, I am now clear.
> Regarding question no.1, does your answer apply to those custom states as
> well? This concern of mine came from Flink's implementation of
> CountTrigger,
> in which a custom state is being cleared explicitly in Trigger.clear():
>
> /       public void clear(W window, TriggerContext ctx) throws Exception {
>                 ctx.getPartitionedState(stateDesc).clear();
>         }
> /
>
> My 3rd question was for ordinary, non-windowed keyed streams, where I don't
> see in Flink's document any mention of using Trigger, so how can I clear
> those streams?
>
> Thank you very much for your help.
> Regards,
> Averell
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: When does Trigger.clear() get called?

Posted by Averell <lv...@gmail.com>.
Hello Hequn,

Thanks for the answers.
Regarding question no.2, I am now clear.
Regarding question no.1, does your answer apply to those custom states as
well? This concern of mine came from Flink's implementation of CountTrigger,
in which a custom state is being cleared explicitly in Trigger.clear():

/	public void clear(W window, TriggerContext ctx) throws Exception {
		ctx.getPartitionedState(stateDesc).clear();
	}
/

My 3rd question was for ordinary, non-windowed keyed streams, where I don't
see in Flink's document any mention of using Trigger, so how can I clear
those streams?

Thank you very much for your help.
Regards,
Averell
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: When does Trigger.clear() get called?

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Averell,

> 1. Neither PURGE nor clear() removes the States (so the States must be
explicitly cleared by the user).
Both PURGE and clear() remove state. The PURGE action removes the window
state, i.e. the aggregate value. The clear() removes the window meta data
including state in Trigger.

> 2. When an event for a window arrives after PURGE has been called, it is
still be processed, and is treated as the first event of that window.
In most cases, the answer is yes. However, there is a chance that the event
is not treated as the first one by the trigger, since PURGE clears the
window state but the window meta data including the Trigger remain.

>  if I know that some keys would never have new events anymore,
should/could I remove those streams corresponding to those keys
Yes. I think we can return FIRE_AND_PURGE.

Best, Hequn



On Sun, Oct 14, 2018 at 7:30 AM Averell <lv...@gmail.com> wrote:

> Hello Fabian,
>
> So could I assume the followings?
>
> 1. Neither PURGE nor clear() removes the States (so the States must be
> explicitly cleared by the user).
> 2. When an event for a window arrives after PURGE has been called, it is
> still be processed, and is treated as the first event of that window.
>
> And one related question: for keyed streams, if I know that some keys would
> never have new events anymore, should/could I remove those streams
> corresponding to those keys so that I can save some memory allocated to the
> metadata?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: When does Trigger.clear() get called?

Posted by Averell <lv...@gmail.com>.
Hello Fabian,

So could I assume the followings?

1. Neither PURGE nor clear() removes the States (so the States must be
explicitly cleared by the user).
2. When an event for a window arrives after PURGE has been called, it is
still be processed, and is treated as the first event of that window.

And one related question: for keyed streams, if I know that some keys would
never have new events anymore, should/could I remove those streams
corresponding to those keys so that I can save some memory allocated to the
metadata?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: When does Trigger.clear() get called?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Andrew,

The PURGE action of a window removes the window state (i.e., the collected
events or computed aggregate) but the window meta data including the
Trigger remain.
The Trigger.close() method is called, when the winodw is completely (i.e.,
all meta data) discarded. This happens, when the time (wallclock time for
processing time or watermark for event time windows) exceeds the window's
end timestamp.

Best, Fabian

Am Fr., 12. Okt. 2018 um 05:25 Uhr schrieb Hequn Cheng <chenghequn@gmail.com
>:

> Hi Andrew,
>
> Do you use CountWindow? You can switch to TimeWindow to have a test.
> I'm not quite familiar with window. I checked the code and found that
> clear() is called only when timer is triggered, i.e, called at the end of
> time window.
> Hope this helps.
>
> Best, Hequn
>
> On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks <a....@gmail.com> wrote:
>
>> Hello,
>>
>> I see that the clear() function is implemented for various types of
>> Triggers in the Flink API. For example:
>>
>> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87
>>
>> I am working on a custom Trigger for my application and have implemented
>> clear() in a similar way.
>>
>> However, having put a breakpoint in this function it doesn’t seem to get
>> called when I expect. The source code says that is called "when a window is
>> purged”[1] but when my Trigger emits a PURGE this function never seems to
>> get called. I am on Flink 1.3.
>>
>> Hoping someone can shed more light on the purpose of clear() and how/when
>> it gets called
>>
>> Thanks!
>> Andrew
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111
>>
>>

Re: When does Trigger.clear() get called?

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Andrew,

Do you use CountWindow? You can switch to TimeWindow to have a test.
I'm not quite familiar with window. I checked the code and found that
clear() is called only when timer is triggered, i.e, called at the end of
time window.
Hope this helps.

Best, Hequn

On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks <a....@gmail.com> wrote:

> Hello,
>
> I see that the clear() function is implemented for various types of
> Triggers in the Flink API. For example:
>
> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87
>
> I am working on a custom Trigger for my application and have implemented
> clear() in a similar way.
>
> However, having put a breakpoint in this function it doesn’t seem to get
> called when I expect. The source code says that is called "when a window is
> purged”[1] but when my Trigger emits a PURGE this function never seems to
> get called. I am on Flink 1.3.
>
> Hoping someone can shed more light on the purpose of clear() and how/when
> it gets called
>
> Thanks!
> Andrew
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111
>
>