You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alex Drobinsky <al...@gmail.com> on 2022/02/03 12:41:22 UTC

How to prevent check pointing of timers ?

Dear flink user,

In our project, restoring the timer's state creates numerous issues, so I
would like to know
if it is possible to avoid save/restore of timers altogether.
If it isn't possible, how could I delete all registered timers during the
open function ?

Best regards,
Alexander

Re: How to prevent check pointing of timers ?

Posted by Alex Drobinsky <al...@gmail.com>.
Event time would be fine for window processing or if all keyed streams use
at least the same timeline - however, in our case it isn't always the case.
Imagine that half of streams have real time timestamps and other half is
historic data ( in our case from pcap files ).
The real time timestamps are in far future relative to historic data, they
would trigger any timer immediately.
So we are always using processing time - it works correctly if we are not
lagging behind.
Anyway, to summarise situation -
 it isn't trivial to prevent Flink from check pointing timers hence easiest
way to  deal with "immediate trigger" phenomenon is to implement workaround
at level of onTimer function.
Thanks for your time everybody !
Much appreciated,
Alex

вт, 8 февр. 2022 г. в 09:49, Frank Dekervel <fr...@kapernikov.com>:

> Hello,
>
> I guess you already evaluated moving to event time and you were not able ?
> Because this really seems to be a case for moving to event time timers. I
> think that would require some effort (including choosing a good watermark
> strategy) but then would solve all your problems.
>
> Frank
> On 08.02.22 08:42, Alex Drobinsky wrote:
>
> Sure :) The problem could be defined as the following:
> Imagine you have a stream of data , for example, network traffic.
> This network traffic is keyed by source address / source port /
> destination address / destination port / protocol type.
> Every connection could be "completed" in two ways :
> 1) we encountered packet that indicates end of connection according to
> protocol
> 2) we did not received any packet for that connection during last 60
> seconds
>
> In the second case, function onTimer called by Flink and session are
> closed.
> However, if a crash happens and checkpoint is restored, onTimer being
> called immediately and session has been closed prematurely.
> Now, I would like to prevent this from happening - so I have two solutions
> - first solution is a workaround you already have seen in a previous email
> e.g. first time onTimer has been triggered, it ignores call and resets
> timer.
> Second solution is rather hypothetical e.g. somehow forcing the timer to
> be volatile or reset timer after restore , so the question is if this
> second solution is feasible ?
>
> вт, 8 февр. 2022 г. в 04:19, Yun Tang <my...@live.com>:
>
>> Hi Alex,
>>
>> I think the better solution is to know what the problem you have ever met
>> when restoring the timers?
>>
>> Flink does not support to remove state (including timer state) currently.
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Alex Drobinsky <al...@gmail.com>
>> *Sent:* Monday, February 7, 2022 21:09
>> *To:* Caizhi Weng <ts...@gmail.com>
>> *Cc:* User-Flink <us...@flink.apache.org>
>> *Subject:* Re: How to prevent check pointing of timers ?
>>
>> By timer I mean regular timer from KeyedState which utilized via function
>> onTimer, for example:
>>
>>
>> public class StateWithTimer {
>>     public long timerValue = 0;
>>     public volatile boolean shouldResetTimer = true;
>>
>>     public boolean resetIfMust(long timeoutInMilliseconds, TimerService timerService) {
>>         if (shouldResetTimer) {
>>             setupTimer(timeoutInMilliseconds, timerService);
>>             shouldResetTimer = false;
>>             return true;
>>         }
>>         return false;
>>     }
>>
>>     public void setupTimer(long timeoutInMilliseconds, TimerService timerService) {
>>         // Cancel previous timer        timerService.deleteProcessingTimeTimer(timerValue);
>>         // Register new timer        // Should it be configurable ?        timerValue = (timerService.currentProcessingTime() + timeoutInMilliseconds)*1000/1000;
>>         timerService.registerProcessingTimeTimer(timerValue);
>>     }
>>
>> }
>>
>>
>> State which utilizes timers extends StateWithTimer above, the function
>> resetIfMust is current workaround - it resets timers first time after
>> restart from checkpoint or start.
>>
>> @Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<ClassifierOutput> collector) throws Exception {
>>    MultiStorePacketState so = state.value();
>>    if (so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout, ctx.timerService())) {
>>       return;
>>    }
>>    closeAndReportFile(collector, so);
>>
>>    ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
>>    state.update(so);
>> }
>>
>>
>> пн, 7 февр. 2022 г. в 05:06, Caizhi Weng <ts...@gmail.com>:
>>
>> Hi!
>>
>> Could you elaborate more on your code or share it if possible? Which
>> timer are you talking about? Are you using the data stream API or SQL API?
>> Do you mean the timer registered per record for a window aggregation? Does
>> mini batch aggregation [1] solve your problem?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation
>>
>> Alex Drobinsky <al...@gmail.com> 于2022年2月3日周四 20:41写道:
>>
>> Dear flink user,
>>
>> In our project, restoring the timer's state creates numerous issues, so I
>> would like to know
>> if it is possible to avoid save/restore of timers altogether.
>> If it isn't possible, how could I delete all registered timers during the
>> open function ?
>>
>> Best regards,
>> Alexander
>>
>>

Re: How to prevent check pointing of timers ?

Posted by Frank Dekervel <fr...@kapernikov.com>.
Hello,

I guess you already evaluated moving to event time and you were not able 
? Because this really seems to be a case for moving to event time 
timers. I think that would require some effort (including choosing a 
good watermark strategy) but then would solve all your problems.

Frank

On 08.02.22 08:42, Alex Drobinsky wrote:
> Sure :) The problem could be defined as the following:
> Imagine you have a stream of data , for example, network traffic.
> This network traffic is keyed by source address / source port / 
> destination address / destination port / protocol type.
> Every connection could be "completed" in two ways :
> 1) we encountered packet that indicates end of connection according to 
> protocol
> 2) we did not received any packet for that connection during last 60 
> seconds
>
> In the second case, function onTimer called by Flink and session are 
> closed.
> However, if a crash happens and checkpoint is restored, onTimer being 
> called immediately and session has been closed prematurely.
> Now, I would like to prevent this from happening - so I have two 
> solutions - first solution is a workaround you already have seen in a 
> previous email e.g. first time onTimer has been triggered, it ignores 
> call and resets timer.
> Second solution is rather hypothetical e.g. somehow forcing the timer 
> to be volatile or reset timer after restore , so the question is if 
> this second solution is feasible ?
>
> вт, 8 февр. 2022 г. в 04:19, Yun Tang <my...@live.com>:
>
>     Hi Alex,
>
>     I think the better solution is to know what the problem you have
>     ever met when restoring the timers?
>
>     Flink does not support to remove state (including timer state)
>     currently.
>
>     Best
>     Yun Tang
>     ------------------------------------------------------------------------
>     *From:* Alex Drobinsky <al...@gmail.com>
>     *Sent:* Monday, February 7, 2022 21:09
>     *To:* Caizhi Weng <ts...@gmail.com>
>     *Cc:* User-Flink <us...@flink.apache.org>
>     *Subject:* Re: How to prevent check pointing of timers ?
>     By timer I mean regular timer from KeyedState which utilized via
>     function onTimer, for example:
>
>
>     public class StateWithTimer {
>          public long timerValue =0;
>          public volatile boolean shouldResetTimer =true;
>
>          public boolean resetIfMust(long timeoutInMilliseconds,TimerService timerService) {
>              if (shouldResetTimer) {
>                  setupTimer(timeoutInMilliseconds, timerService);
>                  shouldResetTimer =false;
>                  return true;
>              }
>              return false;
>          }
>
>          public void setupTimer(long timeoutInMilliseconds,TimerService timerService) {
>              // Cancel previous timer timerService.deleteProcessingTimeTimer(timerValue);
>              // Register new timer // Should it be configurable ? timerValue = (timerService.currentProcessingTime() + timeoutInMilliseconds)*1000/1000;
>              timerService.registerProcessingTimeTimer(timerValue);
>          }
>
>     }
>
>
>     State which utilizes timers extends StateWithTimer above, the
>     function resetIfMust is current workaround - it resets timers
>     first time after restart from checkpoint or start.
>
>     @Override public void onTimer(long timestamp,OnTimerContext ctx,Collector<ClassifierOutput> collector) throws Exception {
>         MultiStorePacketState so =state.value();
>         if (so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout, ctx.timerService())) {
>            return;
>         }
>         closeAndReportFile(collector,so);
>
>         ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
>         state.update(so);
>     }
>
>
>     пн, 7 февр. 2022 г. в 05:06, Caizhi Weng <ts...@gmail.com>:
>
>         Hi!
>
>         Could you elaborate more on your code or share it if possible?
>         Which timer are you talking about? Are you using the data
>         stream API or SQL API? Do you mean the timer registered per
>         record for a window aggregation? Does mini batch aggregation
>         [1] solve your problem?
>
>         [1]
>         https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation
>
>         Alex Drobinsky <al...@gmail.com> 于2022年2月3日周四
>         20:41写道:
>
>             Dear flink user,
>
>             In our project, restoring the timer's state creates
>             numerous issues, so I would like to know
>             if it is possible to avoid save/restore of timers altogether.
>             If it isn't possible, how could I delete all registered
>             timers during the open function ?
>
>             Best regards,
>             Alexander
>

Re: How to prevent check pointing of timers ?

Posted by Alex Drobinsky <al...@gmail.com>.
Sure :) The problem could be defined as the following:
Imagine you have a stream of data , for example, network traffic.
This network traffic is keyed by source address / source port / destination
address / destination port / protocol type.
Every connection could be "completed" in two ways :
1) we encountered packet that indicates end of connection according to
protocol
2) we did not received any packet for that connection during last 60
seconds

In the second case, function onTimer called by Flink and session are closed.
However, if a crash happens and checkpoint is restored, onTimer being
called immediately and session has been closed prematurely.
Now, I would like to prevent this from happening - so I have two solutions
- first solution is a workaround you already have seen in a previous email
e.g. first time onTimer has been triggered, it ignores call and resets
timer.
Second solution is rather hypothetical e.g. somehow forcing the timer to be
volatile or reset timer after restore , so the question is if this second
solution is feasible ?

вт, 8 февр. 2022 г. в 04:19, Yun Tang <my...@live.com>:

> Hi Alex,
>
> I think the better solution is to know what the problem you have ever met
> when restoring the timers?
>
> Flink does not support to remove state (including timer state) currently.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Alex Drobinsky <al...@gmail.com>
> *Sent:* Monday, February 7, 2022 21:09
> *To:* Caizhi Weng <ts...@gmail.com>
> *Cc:* User-Flink <us...@flink.apache.org>
> *Subject:* Re: How to prevent check pointing of timers ?
>
> By timer I mean regular timer from KeyedState which utilized via function
> onTimer, for example:
>
>
> public class StateWithTimer {
>     public long timerValue = 0;
>     public volatile boolean shouldResetTimer = true;
>
>     public boolean resetIfMust(long timeoutInMilliseconds, TimerService timerService) {
>         if (shouldResetTimer) {
>             setupTimer(timeoutInMilliseconds, timerService);
>             shouldResetTimer = false;
>             return true;
>         }
>         return false;
>     }
>
>     public void setupTimer(long timeoutInMilliseconds, TimerService timerService) {
>         // Cancel previous timer
>         timerService.deleteProcessingTimeTimer(timerValue);
>         // Register new timer
>         // Should it be configurable ?
>         timerValue = (timerService.currentProcessingTime() + timeoutInMilliseconds)*1000/1000;
>         timerService.registerProcessingTimeTimer(timerValue);
>     }
>
> }
>
>
> State which utilizes timers extends StateWithTimer above, the function
> resetIfMust is current workaround - it resets timers first time after
> restart from checkpoint or start.
>
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector<ClassifierOutput> collector) throws Exception {
>    MultiStorePacketState so = state.value();
>    if (so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout, ctx.timerService())) {
>       return;
>    }
>    closeAndReportFile(collector, so);
>
>    ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
>    state.update(so);
> }
>
>
>
>
>
> пн, 7 февр. 2022 г. в 05:06, Caizhi Weng <ts...@gmail.com>:
>
> Hi!
>
> Could you elaborate more on your code or share it if possible? Which timer
> are you talking about? Are you using the data stream API or SQL API? Do you
> mean the timer registered per record for a window aggregation? Does mini
> batch aggregation [1] solve your problem?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation
>
> Alex Drobinsky <al...@gmail.com> 于2022年2月3日周四 20:41写道:
>
> Dear flink user,
>
> In our project, restoring the timer's state creates numerous issues, so I
> would like to know
> if it is possible to avoid save/restore of timers altogether.
> If it isn't possible, how could I delete all registered timers during the
> open function ?
>
> Best regards,
> Alexander
>
>

Re: How to prevent check pointing of timers ?

Posted by Yun Tang <my...@live.com>.
Hi Alex,

I think the better solution is to know what the problem you have ever met when restoring the timers?

Flink does not support to remove state (including timer state) currently.

Best
Yun Tang
________________________________
From: Alex Drobinsky <al...@gmail.com>
Sent: Monday, February 7, 2022 21:09
To: Caizhi Weng <ts...@gmail.com>
Cc: User-Flink <us...@flink.apache.org>
Subject: Re: How to prevent check pointing of timers ?

By timer I mean regular timer from KeyedState which utilized via function onTimer, for example:



public class StateWithTimer {
    public long timerValue = 0;
    public volatile boolean shouldResetTimer = true;

    public boolean resetIfMust(long timeoutInMilliseconds, TimerService timerService) {
        if (shouldResetTimer) {
            setupTimer(timeoutInMilliseconds, timerService);
            shouldResetTimer = false;
            return true;
        }
        return false;
    }

    public void setupTimer(long timeoutInMilliseconds, TimerService timerService) {
        // Cancel previous timer
        timerService.deleteProcessingTimeTimer(timerValue);
        // Register new timer
        // Should it be configurable ?
        timerValue = (timerService.currentProcessingTime() + timeoutInMilliseconds)*1000/1000;
        timerService.registerProcessingTimeTimer(timerValue);
    }

}

State which utilizes timers extends StateWithTimer above, the function resetIfMust is current workaround - it resets timers first time after restart from checkpoint or start.


@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ClassifierOutput> collector) throws Exception {
   MultiStorePacketState so = state.value();
   if (so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout, ctx.timerService())) {
      return;
   }
   closeAndReportFile(collector, so);

   ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
   state.update(so);
}




пн, 7 февр. 2022 г. в 05:06, Caizhi Weng <ts...@gmail.com>>:
Hi!

Could you elaborate more on your code or share it if possible? Which timer are you talking about? Are you using the data stream API or SQL API? Do you mean the timer registered per record for a window aggregation? Does mini batch aggregation [1] solve your problem?

[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation

Alex Drobinsky <al...@gmail.com>> 于2022年2月3日周四 20:41写道:
Dear flink user,

In our project, restoring the timer's state creates numerous issues, so I would like to know
if it is possible to avoid save/restore of timers altogether.
If it isn't possible, how could I delete all registered timers during the open function ?

Best regards,
Alexander

Re: How to prevent check pointing of timers ?

Posted by Alex Drobinsky <al...@gmail.com>.
By timer I mean regular timer from KeyedState which utilized via function
onTimer, for example:


public class StateWithTimer {
    public long timerValue = 0;
    public volatile boolean shouldResetTimer = true;

    public boolean resetIfMust(long timeoutInMilliseconds,
TimerService timerService) {
        if (shouldResetTimer) {
            setupTimer(timeoutInMilliseconds, timerService);
            shouldResetTimer = false;
            return true;
        }
        return false;
    }

    public void setupTimer(long timeoutInMilliseconds, TimerService
timerService) {
        // Cancel previous timer
        timerService.deleteProcessingTimeTimer(timerValue);
        // Register new timer
        // Should it be configurable ?
        timerValue = (timerService.currentProcessingTime() +
timeoutInMilliseconds)*1000/1000;
        timerService.registerProcessingTimeTimer(timerValue);
    }

}


State which utilizes timers extends StateWithTimer above, the function
resetIfMust is current workaround - it resets timers first time after
restart from checkpoint or start.

@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<ClassifierOutput> collector) throws Exception {
   MultiStorePacketState so = state.value();
   if (so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout,
ctx.timerService())) {
      return;
   }
   closeAndReportFile(collector, so);

   ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
   state.update(so);
}





пн, 7 февр. 2022 г. в 05:06, Caizhi Weng <ts...@gmail.com>:

> Hi!
>
> Could you elaborate more on your code or share it if possible? Which timer
> are you talking about? Are you using the data stream API or SQL API? Do you
> mean the timer registered per record for a window aggregation? Does mini
> batch aggregation [1] solve your problem?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation
>
> Alex Drobinsky <al...@gmail.com> 于2022年2月3日周四 20:41写道:
>
>> Dear flink user,
>>
>> In our project, restoring the timer's state creates numerous issues, so I
>> would like to know
>> if it is possible to avoid save/restore of timers altogether.
>> If it isn't possible, how could I delete all registered timers during the
>> open function ?
>>
>> Best regards,
>> Alexander
>>
>

Re: How to prevent check pointing of timers ?

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Could you elaborate more on your code or share it if possible? Which timer
are you talking about? Are you using the data stream API or SQL API? Do you
mean the timer registered per record for a window aggregation? Does mini
batch aggregation [1] solve your problem?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation

Alex Drobinsky <al...@gmail.com> 于2022年2月3日周四 20:41写道:

> Dear flink user,
>
> In our project, restoring the timer's state creates numerous issues, so I
> would like to know
> if it is possible to avoid save/restore of timers altogether.
> If it isn't possible, how could I delete all registered timers during the
> open function ?
>
> Best regards,
> Alexander
>