You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Navneeth Krishnan <re...@gmail.com> on 2017/09/05 04:49:49 UTC

Process Function

Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap
function. I need to clear the state after sometime and I was looking at
process function for it.

Inside the process element function if I register a timer wouldn't it
create a timer for each incoming message?

// schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);

How can I get something like a clean up task that runs every 2 mins and
evicts all stale data? Also is there a way to get the key inside onTimer
function so that I know which key has to be evicted?

Thanks,
Navneeth

Re: Process Function

Posted by Navneeth Krishnan <re...@gmail.com>.
Thanks a lot everyone. I have the user data ingested from kafka and it is
keyed by userid. There are around 80 parallel flatmap operator instances
after keyby and there are around few million users. The map state includes
userid as the key and some value. I guess I will try the approach that
Aljoscha has mentioned and see how it works.

On Tue, Sep 5, 2017 at 8:17 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> This is mostly correct, but you cannot register a timer in open() because
> we don't have an active key there. Only in process() and onTimer() can you
> register a timer.
>
> In your case, I would suggest to somehow clamp the timestamp to the
> nearest 2 minute (or whatever) interval or to keep an extra ValueState that
> tells you whether you already registered a timer.
>
> Best,
> Aljoscha
>
> On 5. Sep 2017, at 16:55, Kien Truong <du...@gmail.com> wrote:
>
> Hi,
>
> You can register a processing time timer inside the onTimer and the open
> function to have a timer that run periodically.
>
> Pseudo-code example:
>
> ValueState<Long> lastRuntime;
>
> void open() {
>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
> }
>
> void onTimer() {
>   // Run the periodic task
>   if (lastRuntime.get() + 60000 == timeStamp) {
>     periodicTask();
>   }
>   // Re-register the processing time timer timer
>   lastRuntime.setValue(timeStamp);  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
> }
>
> void periodicTask()
>
>
> For the second question, timer are already scoped by key, so you can keep
> a lastModified variable as a ValueState,
> then compare it to the timestamp provided by the timer to see if the
> current key should be evicted.
> Checkout the example on the ProcessFunction page.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/
> process_function.html
>
> Best regards,
> Kien
>
> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>
> Hi All,
>
> I have a streaming pipeline which is keyed by userid and then to a flatmap
> function. I need to clear the state after sometime and I was looking at
> process function for it.
>
> Inside the process element function if I register a timer wouldn't it
> create a timer for each incoming message?
>
> // schedule the next timer 60 seconds from the current event time
>         ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
>
> How can I get something like a clean up task that runs every 2 mins and
> evicts all stale data? Also is there a way to get the key inside onTimer
> function so that I know which key has to be evicted?
>
> Thanks,
> Navneeth
>
>
>

Re: Process Function

Posted by Johannes Schulte <jo...@gmail.com>.
Thanks, that helped to see how we could implement this!

On Wed, Sep 6, 2017 at 12:01 PM, Timo Walther <tw...@apache.org> wrote:

> Hi Johannes,
>
> you can find the implementation for the state clean up here:
> https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flink/
> table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
>
> and a example usage here:
> https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flink/
> table/runtime/aggregate/ProcTimeUnboundedOver.scala
>
> Regards,
> Timo
>
>
> Am 06.09.17 um 10:50 schrieb Aljoscha Krettek:
>
> Hi,
>
> I'm actually not very familiar with the current Table API implementations
> but Fabian or Timo (cc'ed) should know more. I suspect very much that this
> is implemented like this, yes.
>
> Best,
> Aljoscha
>
> On 5. Sep 2017, at 21:14, Johannes Schulte <jo...@gmail.com>
> wrote:
>
> Hi,
>
> one short question I had that fits here. When using higher level streaming
> we can set min and max retention time [1] which is probably used to reduce
> the number of timers registered under the hood. How is this implemented, by
> registering a "clamped" timer?
>
> Thanks,
>
> Johannes
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/
> streaming.html#idle-state-retention-time
>
> On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
>> This is mostly correct, but you cannot register a timer in open() because
>> we don't have an active key there. Only in process() and onTimer() can you
>> register a timer.
>>
>> In your case, I would suggest to somehow clamp the timestamp to the
>> nearest 2 minute (or whatever) interval or to keep an extra ValueState that
>> tells you whether you already registered a timer.
>>
>> Best,
>> Aljoscha
>>
>> On 5. Sep 2017, at 16:55, Kien Truong <du...@gmail.com> wrote:
>>
>> Hi,
>>
>> You can register a processing time timer inside the onTimer and the open
>> function to have a timer that run periodically.
>>
>> Pseudo-code example:
>>
>> ValueState<Long> lastRuntime;
>>
>> void open() {
>>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
>> }
>>
>> void onTimer() {
>>   // Run the periodic task
>>   if (lastRuntime.get() + 60000 == timeStamp) {
>>     periodicTask();
>>   }
>>   // Re-register the processing time timer timer
>>   lastRuntime.setValue(timeStamp);  ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
>> }
>>
>> void periodicTask()
>>
>>
>> For the second question, timer are already scoped by key, so you can keep
>> a lastModified variable as a ValueState,
>> then compare it to the timestamp provided by the timer to see if the
>> current key should be evicted.
>> Checkout the example on the ProcessFunction page.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/stream/process_function.html
>>
>> Best regards,
>> Kien
>>
>> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>>
>> Hi All,
>>
>> I have a streaming pipeline which is keyed by userid and then to a
>> flatmap function. I need to clear the state after sometime and I was
>> looking at process function for it.
>>
>> Inside the process element function if I register a timer wouldn't it
>> create a timer for each incoming message?
>>
>> // schedule the next timer 60 seconds from the current event time
>>         ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
>>
>> How can I get something like a clean up task that runs every 2 mins and
>> evicts all stale data? Also is there a way to get the key inside onTimer
>> function so that I know which key has to be evicted?
>>
>> Thanks,
>> Navneeth
>>
>>
>>
>
>
>

Re: Process Function

Posted by Timo Walther <tw...@apache.org>.
Hi Johannes,

you can find the implementation for the state clean up here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala

and a example usage here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala

Regards,
Timo


Am 06.09.17 um 10:50 schrieb Aljoscha Krettek:
> Hi,
>
> I'm actually not very familiar with the current Table API 
> implementations but Fabian or Timo (cc'ed) should know more. I suspect 
> very much that this is implemented like this, yes.
>
> Best,
> Aljoscha
>
>> On 5. Sep 2017, at 21:14, Johannes Schulte 
>> <johannes.schulte@gmail.com <ma...@gmail.com>> wrote:
>>
>> Hi,
>>
>> one short question I had that fits here. When using higher level 
>> streaming we can set min and max retention time [1] which is probably 
>> used to reduce the number of timers registered under the hood. How is 
>> this implemented, by registering a "clamped" timer?
>>
>> Thanks,
>>
>> Johannes
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#idle-state-retention-time
>>
>> On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <aljoscha@apache.org 
>> <ma...@apache.org>> wrote:
>>
>>     Hi,
>>
>>     This is mostly correct, but you cannot register a timer in open()
>>     because we don't have an active key there. Only in process() and
>>     onTimer() can you register a timer.
>>
>>     In your case, I would suggest to somehow clamp the timestamp to
>>     the nearest 2 minute (or whatever) interval or to keep an extra
>>     ValueState that tells you whether you already registered a timer.
>>
>>     Best,
>>     Aljoscha
>>
>>>     On 5. Sep 2017, at 16:55, Kien Truong <duckientruong@gmail.com
>>>     <ma...@gmail.com>> wrote:
>>>
>>>     Hi,
>>>
>>>     You can register a processing time timer inside the onTimer and
>>>     the open function to have a timer that run periodically.
>>>
>>>     Pseudo-code example:
>>>
>>>     |ValueState<Long> lastRuntime; void open() {
>>>     ctx.timerService().registerProcessingTimeTimer(current.timestamp
>>>     + 60000); } void onTimer() { // Run the periodic task if
>>>     (lastRuntime.get() + 60000 == timeStamp) { periodicTask(); } //
>>>     Re-register the processing time timer timer
>>>     lastRuntime.setValue(timeStamp); |   ||ctx.timerService().registerProcessingTimeTimer(current.timestamp
>>>     + 60000);| } void periodicTask() |
>>>
>>>     For the second question, timer are already scoped by key, so you
>>>     can keep a lastModified variable as a ValueState,
>>>     then compare it to the timestamp provided by the timer to see if
>>>     the current key should be evicted.
>>>     Checkout the example on the ProcessFunction page.
>>>
>>>     https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
>>>     <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>
>>>
>>>     Best regards,
>>>     Kien
>>>
>>>     On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>>>>     Hi All,
>>>>
>>>>     I have a streaming pipeline which is keyed by userid and then
>>>>     to a flatmap function. I need to clear the state after sometime
>>>>     and I was looking at process function for it.
>>>>
>>>>     Inside the process element function if I register a timer
>>>>     wouldn't it create a timer for each incoming message?
>>>>     |// schedule the next timer 60 seconds from the current event
>>>>     time
>>>>     ctx.timerService().registerEventTimeTimer(current.timestamp +
>>>>     60000);|
>>>>     How can I get something like a clean up task that runs every 2
>>>>     mins and evicts all stale data? Also is there a way to get the
>>>>     key inside onTimer function so that I know which key has to be
>>>>     evicted?
>>>>
>>>>     Thanks,
>>>>     Navneeth
>>
>>
>


Re: Process Function

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I'm actually not very familiar with the current Table API implementations but Fabian or Timo (cc'ed) should know more. I suspect very much that this is implemented like this, yes.

Best,
Aljoscha

> On 5. Sep 2017, at 21:14, Johannes Schulte <jo...@gmail.com> wrote:
> 
> Hi,
> 
> one short question I had that fits here. When using higher level streaming we can set min and max retention time [1] which is probably used to reduce the number of timers registered under the hood. How is this implemented, by registering a "clamped" timer?
> 
> Thanks,
> 
> Johannes
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#idle-state-retention-time <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#idle-state-retention-time>
> 
> On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
> 
> This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer.
> 
> In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer.
> 
> Best,
> Aljoscha
> 
>> On 5. Sep 2017, at 16:55, Kien Truong <duckientruong@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.
>> Pseudo-code example:
>> 
>> ValueState<Long> lastRuntime;
>> 
>> void open() {
>>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
>> }
>> 
>> void onTimer() {
>>   // Run the periodic task
>>   if (lastRuntime.get() + 60000 == timeStamp) {
>>     periodicTask();
>>   }
>>   // Re-register the processing time timer timer
>>   lastRuntime.setValue(timeStamp);
>>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
>> }
>> 
>> void periodicTask()
>> 
>> For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState, 
>> then compare it to the timestamp provided by the timer to see if the current key should be evicted. 
>> Checkout the example on the ProcessFunction page. 
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>
>> 
>> Best regards,
>> Kien
>> 
>> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>>> Hi All,
>>> 
>>> I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.
>>> 
>>> Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
>>> // schedule the next timer 60 seconds from the current event time
>>>         ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
>>> How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?
>>> 
>>> Thanks,
>>> Navneeth
> 
> 


Re: Process Function

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

This is mostly correct, but you cannot register a timer in open() because we don't have an active key there. Only in process() and onTimer() can you register a timer.

In your case, I would suggest to somehow clamp the timestamp to the nearest 2 minute (or whatever) interval or to keep an extra ValueState that tells you whether you already registered a timer.

Best,
Aljoscha

> On 5. Sep 2017, at 16:55, Kien Truong <du...@gmail.com> wrote:
> 
> Hi,
> 
> You can register a processing time timer inside the onTimer and the open function to have a timer that run periodically.
> Pseudo-code example:
> 
> ValueState<Long> lastRuntime;
> 
> void open() {
>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
> }
> 
> void onTimer() {
>   // Run the periodic task
>   if (lastRuntime.get() + 60000 == timeStamp) {
>     periodicTask();
>   }
>   // Re-register the processing time timer timer
>   lastRuntime.setValue(timeStamp);
>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000);
> }
> 
> void periodicTask()
> 
> For the second question, timer are already scoped by key, so you can keep a lastModified variable as a ValueState, 
> then compare it to the timestamp provided by the timer to see if the current key should be evicted. 
> Checkout the example on the ProcessFunction page. 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>
> 
> Best regards,
> Kien
> 
> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>> Hi All,
>> 
>> I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it.
>> 
>> Inside the process element function if I register a timer wouldn't it create a timer for each incoming message?
>> // schedule the next timer 60 seconds from the current event time
>>         ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
>> How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted?
>> 
>> Thanks,
>> Navneeth


Re: Process Function

Posted by Kien Truong <du...@gmail.com>.
Hi,

You can register a processing time timer inside the onTimer and the open 
function to have a timer that run periodically.

Pseudo-code example:

|ValueState<Long> lastRuntime; void open() { 
ctx.timerService().registerProcessingTimeTimer(current.timestamp + 
60000); } void onTimer() { // Run the periodic task if 
(lastRuntime.get() + 60000 == timeStamp) { periodicTask(); } // 
Re-register the processing time timer timer 
lastRuntime.setValue(timeStamp); |   ||ctx.timerService().registerProcessingTimeTimer(current.timestamp + 
60000);| } void periodicTask() |


For the second question, timer are already scoped by key, so you can 
keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if the 
current key should be evicted.
Checkout the example on the ProcessFunction page.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html

Best regards,
Kien

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
> Hi All,
>
> I have a streaming pipeline which is keyed by userid and then to a 
> flatmap function. I need to clear the state after sometime and I was 
> looking at process function for it.
>
> Inside the process element function if I register a timer wouldn't it 
> create a timer for each incoming message?
> |// schedule the next timer 60 seconds from the current event time 
> ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);|
> How can I get something like a clean up task that runs every 2 mins 
> and evicts all stale data? Also is there a way to get the key inside 
> onTimer function so that I know which key has to be evicted?
>
> Thanks,
> Navneeth

Re: Process Function

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Navneeth,

Currently, I don't think there is any built-in functionality to trigger
onTimer periodically.
As for the second part of your question, do you mean that you want to query
on which key the fired timer was registered from? I think this also isn't
possible right now.

I'm looping in Aljoscha in CC in case he has more insight on this.

Cheers,
Gordon


On Tue, Sep 5, 2017 at 4:55 PM, Biplob Biswas <re...@gmail.com>
wrote:

> How are you determining your data is stale? Also if you want to know the
> key,
> why don't you store the key in your state as well?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Process Function

Posted by Biplob Biswas <re...@gmail.com>.
How are you determining your data is stale? Also if you want to know the key,
why don't you store the key in your state as well? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/