You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2019/03/07 09:33:47 UTC

Timer question

Hi to all,
I was writing a process function similar to the one described in the Flink
docs at [1].
Basically I need to set a timeout before emitting elements.
However, the proposed approach creates a timer for every incoming
tuple..isn't it dangerous if a key receives a very big burst of events?
You'll end up having tons of useless registered timers that will fill
somehow the memory.
Isn't it better to remove obsolete timers? E.g:

CountWithTimestamp current = state.value();
if (current == null) {
     current = new CountWithTimestamp();
     current.key = value.f0;
 } else {
    ctx.timerService().deleteProcessingTimeTimer(current.lastModified +
timeout);
 }

Best,
Flavio

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

Re: Timer question

Posted by Kostas Kloudas <kk...@gmail.com>.
Thanks!

On Thu, Mar 7, 2019 at 11:59 AM Flavio Pompermaier <po...@okkam.it>
wrote:

> Here it is: https://issues.apache.org/jira/browse/FLINK-11852 for the
> moment
>
> On Thu, Mar 7, 2019 at 11:49 AM Kostas Kloudas <kk...@gmail.com> wrote:
>
>> I believe you are right.
>>
>> It would be helpful to modify the example to delete the redundant timers
>> and some
>> text that explains that timers are also state and users should pay
>> attention to that.
>>
>> Would you like to open a JIRA and submit a PR?
>>
>> Cheers,
>> Kostas
>>
>>
>> On Thu, Mar 7, 2019 at 11:30 AM Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> Yes, you're right Kostas..in my code I was using processing time so I
>>> forgot to replace it with event time (used by the example).
>>> Maybe it could worth it to mention this problem in the doc..like pros
>>> and cons. What do you think?
>>>
>>> On Thu, Mar 7, 2019 at 11:27 AM Kostas Kloudas <kk...@gmail.com>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> In general, deleting the redundant timers is definitely more
>>>> memory-friendly.
>>>> The reason why in the docs the code is presented the way it is, is:
>>>> 1) it is mainly for pedagogical purposes, and
>>>> 2) when the docs were written, Flink mechanism for deleting timers was
>>>>     not efficient as it has to iterate over the whole list of
>>>> registered timers.
>>>>
>>>> The code you presented seems ok, apart from the
>>>> `deleteProcessingTimeTimer`
>>>> which should be `deleteEventTimeTimer` in the case of the example in
>>>> the docs.
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Thu, Mar 7, 2019 at 10:44 AM Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Hi to all,
>>>>> I was writing a process function similar to the one described in the
>>>>> Flink docs at [1].
>>>>> Basically I need to set a timeout before emitting elements.
>>>>> However, the proposed approach creates a timer for every incoming
>>>>> tuple..isn't it dangerous if a key receives a very big burst of events?
>>>>> You'll end up having tons of useless registered timers that will fill
>>>>> somehow the memory.
>>>>> Isn't it better to remove obsolete timers? E.g:
>>>>>
>>>>> CountWithTimestamp current = state.value();
>>>>> if (current == null) {
>>>>>      current = new CountWithTimestamp();
>>>>>      current.key = value.f0;
>>>>>  } else {
>>>>>     ctx.timerService().deleteProcessingTimeTimer(current.lastModified
>>>>> + timeout);
>>>>>  }
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>>
>>>>>
>>>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>

Re: Timer question

Posted by Flavio Pompermaier <po...@okkam.it>.
Here it is: https://issues.apache.org/jira/browse/FLINK-11852 for the moment

On Thu, Mar 7, 2019 at 11:49 AM Kostas Kloudas <kk...@gmail.com> wrote:

> I believe you are right.
>
> It would be helpful to modify the example to delete the redundant timers
> and some
> text that explains that timers are also state and users should pay
> attention to that.
>
> Would you like to open a JIRA and submit a PR?
>
> Cheers,
> Kostas
>
>
> On Thu, Mar 7, 2019 at 11:30 AM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Yes, you're right Kostas..in my code I was using processing time so I
>> forgot to replace it with event time (used by the example).
>> Maybe it could worth it to mention this problem in the doc..like pros and
>> cons. What do you think?
>>
>> On Thu, Mar 7, 2019 at 11:27 AM Kostas Kloudas <kk...@gmail.com>
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> In general, deleting the redundant timers is definitely more
>>> memory-friendly.
>>> The reason why in the docs the code is presented the way it is, is:
>>> 1) it is mainly for pedagogical purposes, and
>>> 2) when the docs were written, Flink mechanism for deleting timers was
>>>     not efficient as it has to iterate over the whole list of registered
>>> timers.
>>>
>>> The code you presented seems ok, apart from the
>>> `deleteProcessingTimeTimer`
>>> which should be `deleteEventTimeTimer` in the case of the example in the
>>> docs.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Mar 7, 2019 at 10:44 AM Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>>
>>>> Hi to all,
>>>> I was writing a process function similar to the one described in the
>>>> Flink docs at [1].
>>>> Basically I need to set a timeout before emitting elements.
>>>> However, the proposed approach creates a timer for every incoming
>>>> tuple..isn't it dangerous if a key receives a very big burst of events?
>>>> You'll end up having tons of useless registered timers that will fill
>>>> somehow the memory.
>>>> Isn't it better to remove obsolete timers? E.g:
>>>>
>>>> CountWithTimestamp current = state.value();
>>>> if (current == null) {
>>>>      current = new CountWithTimestamp();
>>>>      current.key = value.f0;
>>>>  } else {
>>>>     ctx.timerService().deleteProcessingTimeTimer(current.lastModified +
>>>> timeout);
>>>>  }
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>
>>>>
>>

-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809

Re: Timer question

Posted by Kostas Kloudas <kk...@gmail.com>.
I believe you are right.

It would be helpful to modify the example to delete the redundant timers
and some
text that explains that timers are also state and users should pay
attention to that.

Would you like to open a JIRA and submit a PR?

Cheers,
Kostas


On Thu, Mar 7, 2019 at 11:30 AM Flavio Pompermaier <po...@okkam.it>
wrote:

> Yes, you're right Kostas..in my code I was using processing time so I
> forgot to replace it with event time (used by the example).
> Maybe it could worth it to mention this problem in the doc..like pros and
> cons. What do you think?
>
> On Thu, Mar 7, 2019 at 11:27 AM Kostas Kloudas <kk...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> In general, deleting the redundant timers is definitely more
>> memory-friendly.
>> The reason why in the docs the code is presented the way it is, is:
>> 1) it is mainly for pedagogical purposes, and
>> 2) when the docs were written, Flink mechanism for deleting timers was
>>     not efficient as it has to iterate over the whole list of registered
>> timers.
>>
>> The code you presented seems ok, apart from the
>> `deleteProcessingTimeTimer`
>> which should be `deleteEventTimeTimer` in the case of the example in the
>> docs.
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Mar 7, 2019 at 10:44 AM Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> Hi to all,
>>> I was writing a process function similar to the one described in the
>>> Flink docs at [1].
>>> Basically I need to set a timeout before emitting elements.
>>> However, the proposed approach creates a timer for every incoming
>>> tuple..isn't it dangerous if a key receives a very big burst of events?
>>> You'll end up having tons of useless registered timers that will fill
>>> somehow the memory.
>>> Isn't it better to remove obsolete timers? E.g:
>>>
>>> CountWithTimestamp current = state.value();
>>> if (current == null) {
>>>      current = new CountWithTimestamp();
>>>      current.key = value.f0;
>>>  } else {
>>>     ctx.timerService().deleteProcessingTimeTimer(current.lastModified +
>>> timeout);
>>>  }
>>>
>>> Best,
>>> Flavio
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>
>>>
>

Re: Timer question

Posted by Flavio Pompermaier <po...@okkam.it>.
Yes, you're right Kostas..in my code I was using processing time so I
forgot to replace it with event time (used by the example).
Maybe it could worth it to mention this problem in the doc..like pros and
cons. What do you think?

On Thu, Mar 7, 2019 at 11:27 AM Kostas Kloudas <kk...@gmail.com> wrote:

> Hi Flavio,
>
> In general, deleting the redundant timers is definitely more
> memory-friendly.
> The reason why in the docs the code is presented the way it is, is:
> 1) it is mainly for pedagogical purposes, and
> 2) when the docs were written, Flink mechanism for deleting timers was
>     not efficient as it has to iterate over the whole list of registered
> timers.
>
> The code you presented seems ok, apart from the `deleteProcessingTimeTimer`
> which should be `deleteEventTimeTimer` in the case of the example in the
> docs.
>
> Cheers,
> Kostas
>
> On Thu, Mar 7, 2019 at 10:44 AM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Hi to all,
>> I was writing a process function similar to the one described in the
>> Flink docs at [1].
>> Basically I need to set a timeout before emitting elements.
>> However, the proposed approach creates a timer for every incoming
>> tuple..isn't it dangerous if a key receives a very big burst of events?
>> You'll end up having tons of useless registered timers that will fill
>> somehow the memory.
>> Isn't it better to remove obsolete timers? E.g:
>>
>> CountWithTimestamp current = state.value();
>> if (current == null) {
>>      current = new CountWithTimestamp();
>>      current.key = value.f0;
>>  } else {
>>     ctx.timerService().deleteProcessingTimeTimer(current.lastModified +
>> timeout);
>>  }
>>
>> Best,
>> Flavio
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>>

Re: Timer question

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Flavio,

In general, deleting the redundant timers is definitely more
memory-friendly.
The reason why in the docs the code is presented the way it is, is:
1) it is mainly for pedagogical purposes, and
2) when the docs were written, Flink mechanism for deleting timers was
    not efficient as it has to iterate over the whole list of registered
timers.

The code you presented seems ok, apart from the `deleteProcessingTimeTimer`
which should be `deleteEventTimeTimer` in the case of the example in the
docs.

Cheers,
Kostas

On Thu, Mar 7, 2019 at 10:44 AM Flavio Pompermaier <po...@okkam.it>
wrote:

> Hi to all,
> I was writing a process function similar to the one described in the Flink
> docs at [1].
> Basically I need to set a timeout before emitting elements.
> However, the proposed approach creates a timer for every incoming
> tuple..isn't it dangerous if a key receives a very big burst of events?
> You'll end up having tons of useless registered timers that will fill
> somehow the memory.
> Isn't it better to remove obsolete timers? E.g:
>
> CountWithTimestamp current = state.value();
> if (current == null) {
>      current = new CountWithTimestamp();
>      current.key = value.f0;
>  } else {
>     ctx.timerService().deleteProcessingTimeTimer(current.lastModified +
> timeout);
>  }
>
> Best,
> Flavio
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
>