You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Boris Lublinsky <bo...@lightbend.com> on 2018/08/31 11:42:32 UTC

Usage of "onTime" in ProcessFunction

I am effectively trying to simulate processing windows - drop the results that are not complete in time and was trying to use onTimer method in my Processor implementation.
I am not sure that I understand exactly how this works. When I start execution (in a different processor) I am executing
ctx.timerService.registerEventTimeTimer(currentTime + speculativeTimeout.value())
Basically the absolute cut off time.

Is this the right usage? What is happening when I have more then on timer started?


Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/


Re: Usage of "onTime" in ProcessFunction

Posted by Boris Lublinsky <bo...@lightbend.com>.
Thanks Andrey
I do not have event time, dealing only with process time.
My process gets 2 types of messages:
1. Start processing, which starts the timer, creates a GUID and outputs event to another stream for the actual processing. Lets say at time 45s and I want to make sure that my result will come back in the next 10s, otherwise I ignore the response. I then start a timer for time 55s
2. Reply that can either come back in time or later. If it is in time (GUID is present), I send the reply back. If GUID is not present I Ignore it

onTimer is basically removing GUID that timed out from memory, so that I can ignore the late arrivals

Now I can have several start requests for times 45, 50, and 52. If my trout is ten, I have timers for 55, 60 and 62.

Will all of them fire at these time intervals assuming that timer’s processing time is 0?




Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/

> On Aug 31, 2018, at 2:20 PM, Andrey Zagrebin <an...@data-artisans.com> wrote:
> 
> Hi,
> 
> the timers are scoped to the current key when you apply a processing function to a KeyedStream.
> If you register more than one timer for a particular key and timestamp, you will get only one onTimer callback, see also in docs [1]. Timers registered in a processing function will trigger only in this processing function. All records and timer callbacks are processed sequentially for a particular key in one of parallel instances of the operator.
> 
> Depending on your use case, if you use event time timer, it might make sense to use current watermark as a ‘currentTime’ in your code snippet.
> 
> Best,
> Andrey
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/process_function.html#timers <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/process_function.html#timers>
> 
>> On 31 Aug 2018, at 13:42, Boris Lublinsky <boris.lublinsky@lightbend.com <ma...@lightbend.com>> wrote:
>> 
>> I am effectively trying to simulate processing windows - drop the results that are not complete in time and was trying to use onTimer method in my Processor implementation.
>> I am not sure that I understand exactly how this works. When I start execution (in a different processor) I am executing
>> ctx.timerService.registerEventTimeTimer(currentTime + speculativeTimeout.value())
>> Basically the absolute cut off time.
>> 
>> Is this the right usage? What is happening when I have more then on timer started?
>> 
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <ma...@lightbend.com>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
> 


Re: Usage of "onTime" in ProcessFunction

Posted by Andrey Zagrebin <an...@data-artisans.com>.
Hi,

the timers are scoped to the current key when you apply a processing function to a KeyedStream.
If you register more than one timer for a particular key and timestamp, you will get only one onTimer callback, see also in docs [1]. Timers registered in a processing function will trigger only in this processing function. All records and timer callbacks are processed sequentially for a particular key in one of parallel instances of the operator.

Depending on your use case, if you use event time timer, it might make sense to use current watermark as a ‘currentTime’ in your code snippet.

Best,
Andrey

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/process_function.html#timers <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/process_function.html#timers>

> On 31 Aug 2018, at 13:42, Boris Lublinsky <bo...@lightbend.com> wrote:
> 
> I am effectively trying to simulate processing windows - drop the results that are not complete in time and was trying to use onTimer method in my Processor implementation.
> I am not sure that I understand exactly how this works. When I start execution (in a different processor) I am executing
> ctx.timerService.registerEventTimeTimer(currentTime + speculativeTimeout.value())
> Basically the absolute cut off time.
> 
> Is this the right usage? What is happening when I have more then on timer started?
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <ma...@lightbend.com>
> https://www.lightbend.com/
>