You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrea Spina <an...@radicalbit.io> on 2019/06/24 18:06:43 UTC

Process Function's timers "postponing"

Dear Community,
I am using Flink (processing-time) timers along with a Process Function.
What I would like to do is to "postpone" eventually registered timers for
the given key: I would like to do it since I might process plenty of events
in a row (think about it as a session) so that I will able to trigger the
computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know
the previous timer triggering time, which I guess is not possible for me
since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to
me; for instance, I don't understand what "Since Flink maintains only one
timer per key and timestamp...". Does this imply that a new PT timer will
automatically overwrite an eventual previously existing one?

Thank you for your precious help,

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
-- 
*Andrea Spina*
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Re: Process Function's timers "postponing"

Posted by Andrea Spina <an...@radicalbit.io>.
Hi Yun, thank you so much. That was an idea, I wanted to avoid to store an
additional state for it. In the end, I went for coalescing as documentation
suggested so that I will have just one timer per interval. What I didn't
catch initially from the documentation is that* for a determined key and a
determined timestamp Flink will retain just one timer, i.e. if I set two
timers to trigger at the same time T, Flink will trigger the timer once.*
I accept then to have at least one coalesced timer per interval.

Thank you again for your support!

Il giorno mar 25 giu 2019 alle ore 19:14 Yun Tang <my...@live.com> ha
scritto:

> If you are using processing time, one possible way is to track last
> registered in another ValueState<long>. And you could call
> #deleteProcessingTimeTimer(time) when you register new timer and found
> previous timer which stored in ValueState has smaller timestamp(T1) than
> current time (T2). After delete that processing timer, T1 would not trigger
> any action. You could refer to [1] and its usage for similar ideas.
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/CleanupState.java
>
> ------------------------------
> *From:* Andrea Spina <an...@radicalbit.io>
> *Sent:* Tuesday, June 25, 2019 23:40
> *To:* Yun Tang
> *Cc:* user
> *Subject:* Re: Process Function's timers "postponing"
>
> Hi Yun, thank you for your answer. I'm not sure I got your point. My
> question is:
> for the same key K, I process two records R1 at t1 and R2 at t2.
> When I process R1, I set a timer to be triggered at T1 which is > t2
> When I process R2, I set a timer to be triggered at T2 which is > T1, but
> in order to do that, I want to remove the previous timer T1 in order to
> "postpone" the triggering.
>
> In other words, I would like for a single key to be active just one-timer
> and if a new timer is requested the old one should be deleted.
>
> Thank you,
>
> Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang <my...@live.com> ha
> scritto:
>
> Hi Andrea
>
> If my understanding is correct, you just want to know when the eventual
> timer would be deleted. When you register your timer into
> 'processingTimeTimersQueue' (where your timer stored) at [1], the
> 'SystemProcessingTimeService' would then schedule a runnable TriggerTask
> after the "postpone" delay at [2]. When the scheduled runnable is
> triggered, it would poll from the 'processingTimeTimersQueue' [3] which
> means the timer would finally be removed. Hope this could help you.
>
> Best
> Yun Tang
>
> [1]
> https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
> [2]
> https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
> [3]
> https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
>
> <https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>
>
> ------------------------------
> *From:* Andrea Spina <an...@radicalbit.io>
> *Sent:* Tuesday, June 25, 2019 2:06
> *To:* user
> *Subject:* Process Function's timers "postponing"
>
> Dear Community,
> I am using Flink (processing-time) timers along with a Process Function.
> What I would like to do is to "postpone" eventually registered timers for
> the given key: I would like to do it since I might process plenty of events
> in a row (think about it as a session) so that I will able to trigger the
> computation "just after" this session somehow stops.
>
> I wondered about deleting eventual existing timers but AFAIU I need to
> know the previous timer triggering time, which I guess is not possible for
> me since I use processing-time timers.
>
> I read also [1] but I am not really able to understand if it comes handy
> to me; for instance, I don't understand what "Since Flink maintains only
> one timer per key and timestamp...". Does this imply that a new PT timer
> will automatically overwrite an eventual previously existing one?
>
> Thank you for your precious help,
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
> --
> *Andrea Spina*
> Head of R&D @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>
>
>
> --
> *Andrea Spina*
> Head of R&D @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>


-- 
*Andrea Spina*
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Re: Process Function's timers "postponing"

Posted by Yun Tang <my...@live.com>.
If you are using processing time, one possible way is to track last registered in another ValueState<long>. And you could call #deleteProcessingTimeTimer(time) when you register new timer and found previous timer which stored in ValueState has smaller timestamp(T1) than current time (T2). After delete that processing timer, T1 would not trigger any action. You could refer to [1] and its usage for similar ideas.


[1] https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/CleanupState.java

________________________________
From: Andrea Spina <an...@radicalbit.io>
Sent: Tuesday, June 25, 2019 23:40
To: Yun Tang
Cc: user
Subject: Re: Process Function's timers "postponing"

Hi Yun, thank you for your answer. I'm not sure I got your point. My question is:
for the same key K, I process two records R1 at t1 and R2 at t2.
When I process R1, I set a timer to be triggered at T1 which is > t2
When I process R2, I set a timer to be triggered at T2 which is > T1, but in order to do that, I want to remove the previous timer T1 in order to "postpone" the triggering.

In other words, I would like for a single key to be active just one-timer and if a new timer is requested the old one should be deleted.

Thank you,

Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang <my...@live.com>> ha scritto:
Hi Andrea

If my understanding is correct, you just want to know when the eventual timer would be deleted. When you register your timer into 'processingTimeTimersQueue' (where your timer stored) at [1], the 'SystemProcessingTimeService' would then schedule a runnable TriggerTask after the "postpone" delay at [2]. When the scheduled runnable is triggered, it would poll from the 'processingTimeTimersQueue' [3] which means the timer would finally be removed. Hope this could help you.

Best
Yun Tang

[1] https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
[2] https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
[3] https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
<https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>

________________________________
From: Andrea Spina <an...@radicalbit.io>>
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"

Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I would like to do is to "postpone" eventually registered timers for the given key: I would like to do it since I might process plenty of events in a row (think about it as a session) so that I will able to trigger the computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer triggering time, which I guess is not possible for me since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; for instance, I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does this imply that a new PT timer will automatically overwrite an eventual previously existing one?

Thank you for your precious help,

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Re: Process Function's timers "postponing"

Posted by Andrea Spina <an...@radicalbit.io>.
Hi Yun, thank you for your answer. I'm not sure I got your point. My
question is:
for the same key K, I process two records R1 at t1 and R2 at t2.
When I process R1, I set a timer to be triggered at T1 which is > t2
When I process R2, I set a timer to be triggered at T2 which is > T1, but
in order to do that, I want to remove the previous timer T1 in order to
"postpone" the triggering.

In other words, I would like for a single key to be active just one-timer
and if a new timer is requested the old one should be deleted.

Thank you,

Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang <my...@live.com> ha
scritto:

> Hi Andrea
>
> If my understanding is correct, you just want to know when the eventual
> timer would be deleted. When you register your timer into
> 'processingTimeTimersQueue' (where your timer stored) at [1], the
> 'SystemProcessingTimeService' would then schedule a runnable TriggerTask
> after the "postpone" delay at [2]. When the scheduled runnable is
> triggered, it would poll from the 'processingTimeTimersQueue' [3] which
> means the timer would finally be removed. Hope this could help you.
>
> Best
> Yun Tang
>
> [1]
> https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
> [2]
> https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
> [3]
> https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
>
> <https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>
>
> ------------------------------
> *From:* Andrea Spina <an...@radicalbit.io>
> *Sent:* Tuesday, June 25, 2019 2:06
> *To:* user
> *Subject:* Process Function's timers "postponing"
>
> Dear Community,
> I am using Flink (processing-time) timers along with a Process Function.
> What I would like to do is to "postpone" eventually registered timers for
> the given key: I would like to do it since I might process plenty of events
> in a row (think about it as a session) so that I will able to trigger the
> computation "just after" this session somehow stops.
>
> I wondered about deleting eventual existing timers but AFAIU I need to
> know the previous timer triggering time, which I guess is not possible for
> me since I use processing-time timers.
>
> I read also [1] but I am not really able to understand if it comes handy
> to me; for instance, I don't understand what "Since Flink maintains only
> one timer per key and timestamp...". Does this imply that a new PT timer
> will automatically overwrite an eventual previously existing one?
>
> Thank you for your precious help,
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
> --
> *Andrea Spina*
> Head of R&D @ Radicalbit Srl
> Via Giovanni Battista Pirelli 11, 20124, Milano - IT
>


-- 
*Andrea Spina*
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Re: Process Function's timers "postponing"

Posted by Yun Tang <my...@live.com>.
Hi Andrea

If my understanding is correct, you just want to know when the eventual timer would be deleted. When you register your timer into 'processingTimeTimersQueue' (where your timer stored) at [1], the 'SystemProcessingTimeService' would then schedule a runnable TriggerTask after the "postpone" delay at [2]. When the scheduled runnable is triggered, it would poll from the 'processingTimeTimersQueue' [3] which means the timer would finally be removed. Hope this could help you.

Best
Yun Tang

[1] https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
[2] https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
[3] https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
<https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>

________________________________
From: Andrea Spina <an...@radicalbit.io>
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"

Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I would like to do is to "postpone" eventually registered timers for the given key: I would like to do it since I might process plenty of events in a row (think about it as a session) so that I will able to trigger the computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer triggering time, which I guess is not possible for me since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; for instance, I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does this imply that a new PT timer will automatically overwrite an eventual previously existing one?

Thank you for your precious help,

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT