You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com> on 2022/02/23 16:24:47 UTC

Flink job recovery after task manager failure

Hi,

I am trying to use Flink checkpoints solution in order to support task manager recovery.
I’m running flink using beam with filesystem storage and the following parameters:
checkpointingInterval=30000
checkpointingMode=EXACTLY_ONCE.

What I see is that if I kill a task manager pod, it takes flink about 30 seconds to identify the failure and another 5-6 minutes to restart the jobs.
Is there a way to shorten the downtime? What is an expected downtime in case the task manager is killed, until the jobs are recovered? Are there any best practices for handling it? (e.g. different configuration parameters)

Thanks,
Ifat


Re: Flink job recovery after task manager failure

Posted by "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>.
Hi Zhilong,

I will check the issues you raised.

Thanks for your help,
Ifat

From: Zhilong Hong <zh...@gmail.com>
Date: Thursday, 24 February 2022 at 19:58
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Flink job recovery after task manager failure

Hi, Afek

I've read the log you provided. Since you've set the value of restart-strategy to be exponential-delay and the value of restart-strategy.exponential-delay.initial-backoff is 10s, everytime a failover is triggered, the JobManager will have to wait for 10 seconds before it restarts the job.If you'd prefer a quicker restart, you could change the restart strategy to fixed-delay and set a small value for restart-strategy.fixed-delay.delay.

Furthermore, there are two more failovers that happened during the initialization of recovered tasks. During the initialization of a task, it will try to recover the states from the last valid checkpoint. A FileNotFound exception happens during the recovery process. I'm not quite sure the reason. Since the recovery succeeds after two failovers, I think maybe it's because the local disks of your cluster are not stable.

Sincerely,
Zhilong

On Thu, Feb 24, 2022 at 9:04 PM Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com>> wrote:
Thanks Zhilong.

The first launch of our job is fast, I don’t think that’s the issue. I see in flink job manager log that there were several exceptions during the restart, and the task manager was restarted a few times until it was stabilized.

You can find the log here:
jobmanager-log.txt.gz<https://nokia-my.sharepoint.com/:u:/p/ifat_afek/EUsu4rb_-BpNrkpvSwzI-vgBtBO9OQlIm0CHtW0gsZ7Gqg?email=zhlonghong%40gmail.com&e=ww5Idt>

Thanks,
Ifat

From: Zhilong Hong <zh...@gmail.com>>
Date: Wednesday, 23 February 2022 at 19:38
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Flink job recovery after task manager failure

Hi, Afek!

When a TaskManager is killed, JobManager will not be acknowledged until a heartbeat timeout happens. Currently, the default value of heartbeat.timeout is 50 seconds [1]. That's why it takes more than 30 seconds for Flink to trigger a failover. If you'd like to shorten the time a failover is triggered in this situation, you could decrease the value of heartbeat.timeout in flink-conf.yaml. However, if the value is set too small, heartbeat timeouts will happen more frequently and the cluster will be unstable. As FLINK-23403 [2] mentions, if you are using Flink 1.14 or 1.15, you could try to set the value to 10s.

You mentioned that it takes 5-6 minutes to restart the jobs. It seems a bit weird. How long does it take to deploy your job for a brand new launch? You could compact and upload the log of JobManager to Google Drive or OneDrive and attach the sharing link. Maybe we can find out what happens via the log.

Sincerely,
Zhilong

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
[2] https://issues.apache.org/jira/browse/FLINK-23403

On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com>> wrote:
Hi,

I am trying to use Flink checkpoints solution in order to support task manager recovery.
I’m running flink using beam with filesystem storage and the following parameters:
checkpointingInterval=30000
checkpointingMode=EXACTLY_ONCE.

What I see is that if I kill a task manager pod, it takes flink about 30 seconds to identify the failure and another 5-6 minutes to restart the jobs.
Is there a way to shorten the downtime? What is an expected downtime in case the task manager is killed, until the jobs are recovered? Are there any best practices for handling it? (e.g. different configuration parameters)

Thanks,
Ifat


Re: Flink job recovery after task manager failure

Posted by yidan zhao <hi...@gmail.com>.
I think you should use nfs, which is easily to be deployed unlike hdfs.
The state is written and read by TM.
ZK is used to record some meta data of the checkpoint, such as the ckpt
file path.

Finally, I don't think your job can be recovered normally if you are not
running with a shared storage.


Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com> 于2022年3月2日周三
16:16写道:

> Thanks.
>
>
>
> At the moment I’m running without a shared file storage, and I’m trying to
> better understand the recovery process.
>
>
>
> The behavior I see is as follows:
>
>    - The job manager identifies the task manager’s failure and tries to
>    restart
>    - The job manager fails to find the checkpoint. Looking inside the
>    pods, I see that the checkpoints are stored in the task manager (after it
>    was restarted by k8s), but not in the job manager.
>    - The job manager gives up after a few retries
>    - The job manager succeeds to get the checkpoint from Zookeeper
>    - The job manager successfully restarts the task manager
>
>
>
> I’ll be happy if you can help me understand the
> behavior:
>
> Who is responsible for writing and then reading the checkpoints data? The
> task manager or the job manager? Should the job manager and task manager
> access the same storage, so the job manager can retrieve the task manager
> checkpoints?
>
> What is the Zookeeper’s part in the story?
>
> And is there a way for me to improve this process / shorten the time it
> takes ?
>
>
>
> The relevant section in my log:
>
>
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at java.lang.Thread.run(Unknown Source) ~[?:?]
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore
> operator state backend for
> DoFnOperator_764f1681beaffc88f686a6a1532b6deb_(1/1) from any of the 1
> provided restore options.
>
>         at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         ... 10 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore operator state backend
>
>         at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         ... 10 more
>
> Caused by: java.io.FileNotFoundException:
> /flink_state/checkpoints/523f9e48274186bb97c13e3c2213be0e/chk-8/07e16ec0-0415-4aff-8981-5305e2b618f6
> (No such file or directory)
>
>         at java.io.FileInputStream.open0(Native Method) ~[?:?]
>
>         at java.io.FileInputStream.open(Unknown Source) ~[?:?]
>
>         at java.io.FileInputStream.<init>(Unknown Source) ~[?:?]
>
>         at
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:72)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173)
> ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>
>         ... 10 more
>
>
>
> 2022-02-24 12:18:57,786 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - Calculating tasks to restart to recover the failed task
> 55f0dd6ce96daa34877e5b041baadee7_0.
>
> 2022-02-24 12:18:57,786 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - 2 tasks should be restarted to recover the failed task
> 55f0dd6ce96daa34877e5b041baadee7_0.
>
> 2022-02-24 12:18:57,786 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       [] - Job AmfEventsToCentral (523f9e48274186bb97c13e3c2213be0e)
> switched from state RUNNING to RESTARTING.
>
> 2022-02-24 12:18:57,786 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       [] - Source:
> readFromKafka/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
> -> Flat Map ->
> convertAmfNotificationsToEvents/ParMultiDo(ConvertAmfNotificationsToEvents)
> -> WriteToAvro/MapElements/Map/ParMultiDo(Anonymous) ->
> WriteToAvro/PubsubUnboundedSink/Output Serialized PubsubMessage
> Proto/Map/ParMultiDo(Anonymous) ->
> WriteToAvro/PubsubUnboundedSink/PubsubUnboundedSink.PubsubSink/PubsubUnboundedSink.Window/Window.Assign.out
> ->
> WriteToAvro/PubsubUnboundedSink/PubsubUnboundedSink.PubsubSink/PubsubUnboundedSink.Shard/ParMultiDo(Shard)
> -> ToBinaryKeyedWorkItem (1/1) (cc9e2f14636aae06879e455656eab3ca) switched
> from INITIALIZING to CANCELING.
>
> 2022-02-24 12:18:57,853 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       [] - Source:
> readFromKafka/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
> -> Flat Map ->
> convertAmfNotificationsToEvents/ParMultiDo(ConvertAmfNotificationsToEvents)
> -> WriteToAvro/MapElements/Map/ParMultiDo(Anonymous) ->
> WriteToAvro/PubsubUnboundedSink/Output Serialized PubsubMessage
> Proto/Map/ParMultiDo(Anonymous) ->
> WriteToAvro/PubsubUnboundedSink/PubsubUnboundedSink.PubsubSink/PubsubUnboundedSink.Window/Window.Assign.out
> ->
> WriteToAvro/PubsubUnboundedSink/PubsubUnboundedSink.PubsubSink/PubsubUnboundedSink.Shard/ParMultiDo(Shard)
> -> ToBinaryKeyedWorkItem (1/1) (cc9e2f14636aae06879e455656eab3ca) switched
> from CANCELING to CANCELED.
>
> 2022-02-24 12:18:57,854 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
> [] - Clearing resource requirements of job 523f9e48274186bb97c13e3c2213be0e
>
> 2022-02-24 12:19:52,032 INFO  org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool
> [] - Releasing slot [28162b793b27f93fe4463e330e4cbc33].
>
> 2022-02-24 12:20:16,702 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       [] - Job AmfEventsToCentral (523f9e48274186bb97c13e3c2213be0e)
> switched from state RESTARTING to RUNNING.
>
> 2022-02-24 12:20:16,704 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore
> [] - Recovering checkpoints from
> ZooKeeperStateHandleStore{namespace='flink/default/checkpoints/523f9e48274186bb97c13e3c2213be0e'}.
>
> 2022-02-24 12:20:16,712 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore
> [] - Found 1 checkpoints in
> ZooKeeperStateHandleStore{namespace='flink/default/checkpoints/523f9e48274186bb97c13e3c2213be0e'}.
>
> 2022-02-24 12:20:16,712 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore
> [] - All 1 checkpoints found are already downloaded.
>
> 2022-02-24 12:20:16,712 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Restoring job 523f9e48274186bb97c13e3c2213be0e from Checkpoint 8 @
> 1645704948497 for 523f9e48274186bb97c13e3c2213be0e located at
> file:/flink_state/checkpoints/523f9e48274186bb97c13e3c2213be0e/chk-8.
>
> 2022-02-24 12:20:16,713 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No
> master state to restore
>
> 2
>
>
>
>
>
> Thanks,
>
> Ifat
>
>
>
> *From: *yidan zhao <hi...@gmail.com>
> *Date: *Wednesday, 2 March 2022 at 4:08
> *To: *"Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>
> *Cc: *zhlonghong <zh...@gmail.com>, "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Flink job recovery after task manager failure
>
>
>
> State backend can be set as hashMap or rocksDB.
>
> Checkpoint storage must be a shared file system(nfs or hdfs or something
> else).
>
>
>
> Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com> 于2022年3月2日周三 05:55
> 写道:
>
> Hi,
>
>
>
> I’m trying to understand the guidelines for task manager recovery.
>
> From what I see in the documentation, state backend can be set as in
> memory / file system / rocksdb, and the checkpoint storage requires a
> shared file system for both file system and rocksdb. Is that correct? Must
> the file system be shared between the task managers and job managers? Is
> there another option?
>
>
>
> Thanks,
>
> Ifat
>
>
>
> *From: *Zhilong Hong <zh...@gmail.com>
> *Date: *Thursday, 24 February 2022 at 19:58
> *To: *"Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Flink job recovery after task manager failure
>
>
>
> Hi, Afek
>
>
>
> I've read the log you provided. Since you've set the value of
> restart-strategy to be exponential-delay and the value
> of restart-strategy.exponential-delay.initial-backoff is 10s, everytime a
> failover is triggered, the JobManager will have to wait for 10 seconds
> before it restarts the job.If you'd prefer a quicker restart, you could
> change the restart strategy to fixed-delay and set a small value for
> restart-strategy.fixed-delay.delay.
>
>
>
> Furthermore, there are two more failovers that happened during the
> initialization of recovered tasks. During the initialization of a task, it
> will try to recover the states from the last valid checkpoint. A
> FileNotFound exception happens during the recovery process. I'm not quite
> sure the reason. Since the recovery succeeds after two failovers, I think
> maybe it's because the local disks of your cluster are not stable.
>
>
>
> Sincerely,
>
> Zhilong
>
>
>
> On Thu, Feb 24, 2022 at 9:04 PM Afek, Ifat (Nokia - IL/Kfar Sava) <
> ifat.afek@nokia.com> wrote:
>
> Thanks Zhilong.
>
>
>
> The first launch of our job is fast, I don’t think that’s the issue. I see
> in flink job manager log that there were several exceptions during the
> restart, and the task manager was restarted a few times until it was
> stabilized.
>
>
>
> You can find the log here:
>
> jobmanager-log.txt.gz
> <https://nokia-my.sharepoint.com/:u:/p/ifat_afek/EUsu4rb_-BpNrkpvSwzI-vgBtBO9OQlIm0CHtW0gsZ7Gqg?email=zhlonghong%40gmail.com&e=ww5Idt>
>
>
>
> Thanks,
>
> Ifat
>
>
>
> *From: *Zhilong Hong <zh...@gmail.com>
> *Date: *Wednesday, 23 February 2022 at 19:38
> *To: *"Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Flink job recovery after task manager failure
>
>
>
> Hi, Afek!
>
>
>
> When a TaskManager is killed, JobManager will not be acknowledged until a
> heartbeat timeout happens. Currently, the default value of
> heartbeat.timeout is 50 seconds [1]. That's why it takes more than 30
> seconds for Flink to trigger a failover. If you'd like to shorten the time
> a failover is triggered in this situation, you could decrease the value of
> heartbeat.timeout in flink-conf.yaml. However, if the value is set too
> small, heartbeat timeouts will happen more frequently and the cluster will
> be unstable. As FLINK-23403 [2] mentions, if you are using Flink 1.14 or
> 1.15, you could try to set the value to 10s.
>
>
>
> You mentioned that it takes 5-6 minutes to restart the jobs. It seems a
> bit weird. How long does it take to deploy your job for a brand new launch?
> You could compact and upload the log of JobManager to Google Drive or
> OneDrive and attach the sharing link. Maybe we can find out what happens
> via the log.
>
>
>
> Sincerely,
>
> Zhilong
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
>
> [2] https://issues.apache.org/jira/browse/FLINK-23403
>
>
>
> On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) <
> ifat.afek@nokia.com> wrote:
>
> Hi,
>
>
>
> I am trying to use Flink checkpoints solution in order to support task
> manager recovery.
>
> I’m running flink using beam with filesystem storage and the following
> parameters:
>
> checkpointingInterval=30000
>
> checkpointingMode=EXACTLY_ONCE.
>
>
>
> What I see is that if I kill a task manager pod, it takes flink about 30
> seconds to identify the failure and another 5-6 minutes to restart the jobs.
>
> Is there a way to shorten the downtime? What is an expected downtime in
> case the task manager is killed, until the jobs are recovered? Are there
> any best practices for handling it? (e.g. different configuration
> parameters)
>
>
>
> Thanks,
>
> Ifat
>
>
>
>

Re: Flink job recovery after task manager failure

Posted by "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>.
Thanks.

At the moment I’m running without a shared file storage, and I’m trying to better understand the recovery process.

The behavior I see is as follows:

  *   The job manager identifies the task manager’s failure and tries to restart
  *   The job manager fails to find the checkpoint. Looking inside the pods, I see that the checkpoints are stored in the task manager (after it was restarted by k8s), but not in the job manager.
  *   The job manager gives up after a few retries
  *   The job manager succeeds to get the checkpoint from Zookeeper
  *   The job manager successfully restarts the task manager

I’ll be happy if you can help me understand the behavior:
Who is responsible for writing and then reading the checkpoints data? The task manager or the job manager? Should the job manager and task manager access the same storage, so the job manager can retrieve the task manager checkpoints?
What is the Zookeeper’s part in the story?
And is there a way for me to improve this process / shorten the time it takes ?

The relevant section in my log:


java.lang.Exception: Exception while creating StreamOperatorStateContext.

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at java.lang.Thread.run(Unknown Source) ~[?:?]

Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for DoFnOperator_764f1681beaffc88f686a6a1532b6deb_(1/1) from any of the 1 provided restore options.

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        ... 10 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend

        at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        ... 10 more

Caused by: java.io.FileNotFoundException: /flink_state/checkpoints/523f9e48274186bb97c13e3c2213be0e/chk-8/07e16ec0-0415-4aff-8981-5305e2b618f6 (No such file or directory)

        at java.io.FileInputStream.open0(Native Method) ~[?:?]

        at java.io.FileInputStream.open(Unknown Source) ~[?:?]

        at java.io.FileInputStream.<init>(Unknown Source) ~[?:?]

        at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:72) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173) ~[flink-dist_2.11-1.13.5.jar:1.13.5]

        ... 10 more


2022-02-24 12:18:57,786 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 55f0dd6ce96daa34877e5b041baadee7_0.

2022-02-24 12:18:57,786 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 2 tasks should be restarted to recover the failed task 55f0dd6ce96daa34877e5b041baadee7_0.

2022-02-24 12:18:57,786 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job AmfEventsToCentral (523f9e48274186bb97c13e3c2213be0e) switched from state RUNNING to RESTARTING.

2022-02-24 12:18:57,786 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: readFromKafka/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource) -> Flat Map -> convertAmfNotificationsToEvents/ParMultiDo(ConvertAmfNotificationsToEvents) -> WriteToAvro/MapElements/Map/ParMultiDo(Anonymous) -> WriteToAvro/PubsubUnboundedSink/Output Serialized PubsubMessage Proto/Map/ParMultiDo(Anonymous) -> WriteToAvro/PubsubUnboundedSink/PubsubUnboundedSink.PubsubSink/PubsubUnboundedSink.Window/Window.Assign.out -> WriteToAvro/PubsubUnboundedSink/PubsubUnboundedSink.PubsubSink/PubsubUnboundedSink.Shard/ParMultiDo(Shard) -> ToBinaryKeyedWorkItem (1/1) (cc9e2f14636aae06879e455656eab3ca) switched from INITIALIZING to CANCELING.

2022-02-24 12:18:57,853 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: readFromKafka/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource) -> Flat Map -> convertAmfNotificationsToEvents/ParMultiDo(ConvertAmfNotificationsToEvents) -> WriteToAvro/MapElements/Map/ParMultiDo(Anonymous) -> WriteToAvro/PubsubUnboundedSink/Output Serialized PubsubMessage Proto/Map/ParMultiDo(Anonymous) -> WriteToAvro/PubsubUnboundedSink/PubsubUnboundedSink.PubsubSink/PubsubUnboundedSink.Window/Window.Assign.out -> WriteToAvro/PubsubUnboundedSink/PubsubUnboundedSink.PubsubSink/PubsubUnboundedSink.Shard/ParMultiDo(Shard) -> ToBinaryKeyedWorkItem (1/1) (cc9e2f14636aae06879e455656eab3ca) switched from CANCELING to CANCELED.

2022-02-24 12:18:57,854 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 523f9e48274186bb97c13e3c2213be0e

2022-02-24 12:19:52,032 INFO  org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [28162b793b27f93fe4463e330e4cbc33].

2022-02-24 12:20:16,702 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job AmfEventsToCentral (523f9e48274186bb97c13e3c2213be0e) switched from state RESTARTING to RUNNING.

2022-02-24 12:20:16,704 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Recovering checkpoints from ZooKeeperStateHandleStore{namespace='flink/default/checkpoints/523f9e48274186bb97c13e3c2213be0e'}.

2022-02-24 12:20:16,712 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 1 checkpoints in ZooKeeperStateHandleStore{namespace='flink/default/checkpoints/523f9e48274186bb97c13e3c2213be0e'}.

2022-02-24 12:20:16,712 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - All 1 checkpoints found are already downloaded.

2022-02-24 12:20:16,712 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 523f9e48274186bb97c13e3c2213be0e from Checkpoint 8 @ 1645704948497 for 523f9e48274186bb97c13e3c2213be0e located at file:/flink_state/checkpoints/523f9e48274186bb97c13e3c2213be0e/chk-8.

2022-02-24 12:20:16,713 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master state to restore

2


Thanks,
Ifat

From: yidan zhao <hi...@gmail.com>
Date: Wednesday, 2 March 2022 at 4:08
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>
Cc: zhlonghong <zh...@gmail.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Flink job recovery after task manager failure

State backend can be set as hashMap or rocksDB.
Checkpoint storage must be a shared file system(nfs or hdfs or something else).

Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com>> 于2022年3月2日周三 05:55写道:
Hi,

I’m trying to understand the guidelines for task manager recovery.
From what I see in the documentation, state backend can be set as in memory / file system / rocksdb, and the checkpoint storage requires a shared file system for both file system and rocksdb. Is that correct? Must the file system be shared between the task managers and job managers? Is there another option?

Thanks,
Ifat

From: Zhilong Hong <zh...@gmail.com>>
Date: Thursday, 24 February 2022 at 19:58
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Flink job recovery after task manager failure

Hi, Afek

I've read the log you provided. Since you've set the value of restart-strategy to be exponential-delay and the value of restart-strategy.exponential-delay.initial-backoff is 10s, everytime a failover is triggered, the JobManager will have to wait for 10 seconds before it restarts the job.If you'd prefer a quicker restart, you could change the restart strategy to fixed-delay and set a small value for restart-strategy.fixed-delay.delay.

Furthermore, there are two more failovers that happened during the initialization of recovered tasks. During the initialization of a task, it will try to recover the states from the last valid checkpoint. A FileNotFound exception happens during the recovery process. I'm not quite sure the reason. Since the recovery succeeds after two failovers, I think maybe it's because the local disks of your cluster are not stable.

Sincerely,
Zhilong

On Thu, Feb 24, 2022 at 9:04 PM Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com>> wrote:
Thanks Zhilong.

The first launch of our job is fast, I don’t think that’s the issue. I see in flink job manager log that there were several exceptions during the restart, and the task manager was restarted a few times until it was stabilized.

You can find the log here:
jobmanager-log.txt.gz<https://nokia-my.sharepoint.com/:u:/p/ifat_afek/EUsu4rb_-BpNrkpvSwzI-vgBtBO9OQlIm0CHtW0gsZ7Gqg?email=zhlonghong%40gmail.com&e=ww5Idt>

Thanks,
Ifat

From: Zhilong Hong <zh...@gmail.com>>
Date: Wednesday, 23 February 2022 at 19:38
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Flink job recovery after task manager failure

Hi, Afek!

When a TaskManager is killed, JobManager will not be acknowledged until a heartbeat timeout happens. Currently, the default value of heartbeat.timeout is 50 seconds [1]. That's why it takes more than 30 seconds for Flink to trigger a failover. If you'd like to shorten the time a failover is triggered in this situation, you could decrease the value of heartbeat.timeout in flink-conf.yaml. However, if the value is set too small, heartbeat timeouts will happen more frequently and the cluster will be unstable. As FLINK-23403 [2] mentions, if you are using Flink 1.14 or 1.15, you could try to set the value to 10s.

You mentioned that it takes 5-6 minutes to restart the jobs. It seems a bit weird. How long does it take to deploy your job for a brand new launch? You could compact and upload the log of JobManager to Google Drive or OneDrive and attach the sharing link. Maybe we can find out what happens via the log.

Sincerely,
Zhilong

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
[2] https://issues.apache.org/jira/browse/FLINK-23403

On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com>> wrote:
Hi,

I am trying to use Flink checkpoints solution in order to support task manager recovery.
I’m running flink using beam with filesystem storage and the following parameters:
checkpointingInterval=30000
checkpointingMode=EXACTLY_ONCE.

What I see is that if I kill a task manager pod, it takes flink about 30 seconds to identify the failure and another 5-6 minutes to restart the jobs.
Is there a way to shorten the downtime? What is an expected downtime in case the task manager is killed, until the jobs are recovered? Are there any best practices for handling it? (e.g. different configuration parameters)

Thanks,
Ifat


Re: Flink job recovery after task manager failure

Posted by yidan zhao <hi...@gmail.com>.
State backend can be set as hashMap or rocksDB.
Checkpoint storage must be a shared file system(nfs or hdfs or something
else).

Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com> 于2022年3月2日周三
05:55写道:

> Hi,
>
>
>
> I’m trying to understand the guidelines for task manager recovery.
>
> From what I see in the documentation, state backend can be set as in
> memory / file system / rocksdb, and the checkpoint storage requires a
> shared file system for both file system and rocksdb. Is that correct? Must
> the file system be shared between the task managers and job managers? Is
> there another option?
>
>
>
> Thanks,
>
> Ifat
>
>
>
> *From: *Zhilong Hong <zh...@gmail.com>
> *Date: *Thursday, 24 February 2022 at 19:58
> *To: *"Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Flink job recovery after task manager failure
>
>
>
> Hi, Afek
>
>
>
> I've read the log you provided. Since you've set the value of
> restart-strategy to be exponential-delay and the value
> of restart-strategy.exponential-delay.initial-backoff is 10s, everytime a
> failover is triggered, the JobManager will have to wait for 10 seconds
> before it restarts the job.If you'd prefer a quicker restart, you could
> change the restart strategy to fixed-delay and set a small value for
> restart-strategy.fixed-delay.delay.
>
>
>
> Furthermore, there are two more failovers that happened during the
> initialization of recovered tasks. During the initialization of a task, it
> will try to recover the states from the last valid checkpoint. A
> FileNotFound exception happens during the recovery process. I'm not quite
> sure the reason. Since the recovery succeeds after two failovers, I think
> maybe it's because the local disks of your cluster are not stable.
>
>
>
> Sincerely,
>
> Zhilong
>
>
>
> On Thu, Feb 24, 2022 at 9:04 PM Afek, Ifat (Nokia - IL/Kfar Sava) <
> ifat.afek@nokia.com> wrote:
>
> Thanks Zhilong.
>
>
>
> The first launch of our job is fast, I don’t think that’s the issue. I see
> in flink job manager log that there were several exceptions during the
> restart, and the task manager was restarted a few times until it was
> stabilized.
>
>
>
> You can find the log here:
>
> jobmanager-log.txt.gz
> <https://nokia-my.sharepoint.com/:u:/p/ifat_afek/EUsu4rb_-BpNrkpvSwzI-vgBtBO9OQlIm0CHtW0gsZ7Gqg?email=zhlonghong%40gmail.com&e=ww5Idt>
>
>
>
> Thanks,
>
> Ifat
>
>
>
> *From: *Zhilong Hong <zh...@gmail.com>
> *Date: *Wednesday, 23 February 2022 at 19:38
> *To: *"Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Flink job recovery after task manager failure
>
>
>
> Hi, Afek!
>
>
>
> When a TaskManager is killed, JobManager will not be acknowledged until a
> heartbeat timeout happens. Currently, the default value of
> heartbeat.timeout is 50 seconds [1]. That's why it takes more than 30
> seconds for Flink to trigger a failover. If you'd like to shorten the time
> a failover is triggered in this situation, you could decrease the value of
> heartbeat.timeout in flink-conf.yaml. However, if the value is set too
> small, heartbeat timeouts will happen more frequently and the cluster will
> be unstable. As FLINK-23403 [2] mentions, if you are using Flink 1.14 or
> 1.15, you could try to set the value to 10s.
>
>
>
> You mentioned that it takes 5-6 minutes to restart the jobs. It seems a
> bit weird. How long does it take to deploy your job for a brand new launch?
> You could compact and upload the log of JobManager to Google Drive or
> OneDrive and attach the sharing link. Maybe we can find out what happens
> via the log.
>
>
>
> Sincerely,
>
> Zhilong
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
>
> [2] https://issues.apache.org/jira/browse/FLINK-23403
>
>
>
> On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) <
> ifat.afek@nokia.com> wrote:
>
> Hi,
>
>
>
> I am trying to use Flink checkpoints solution in order to support task
> manager recovery.
>
> I’m running flink using beam with filesystem storage and the following
> parameters:
>
> checkpointingInterval=30000
>
> checkpointingMode=EXACTLY_ONCE.
>
>
>
> What I see is that if I kill a task manager pod, it takes flink about 30
> seconds to identify the failure and another 5-6 minutes to restart the jobs.
>
> Is there a way to shorten the downtime? What is an expected downtime in
> case the task manager is killed, until the jobs are recovered? Are there
> any best practices for handling it? (e.g. different configuration
> parameters)
>
>
>
> Thanks,
>
> Ifat
>
>
>
>

Re: Flink job recovery after task manager failure

Posted by "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>.
Hi,

I’m trying to understand the guidelines for task manager recovery.
From what I see in the documentation, state backend can be set as in memory / file system / rocksdb, and the checkpoint storage requires a shared file system for both file system and rocksdb. Is that correct? Must the file system be shared between the task managers and job managers? Is there another option?

Thanks,
Ifat

From: Zhilong Hong <zh...@gmail.com>
Date: Thursday, 24 February 2022 at 19:58
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Flink job recovery after task manager failure

Hi, Afek

I've read the log you provided. Since you've set the value of restart-strategy to be exponential-delay and the value of restart-strategy.exponential-delay.initial-backoff is 10s, everytime a failover is triggered, the JobManager will have to wait for 10 seconds before it restarts the job.If you'd prefer a quicker restart, you could change the restart strategy to fixed-delay and set a small value for restart-strategy.fixed-delay.delay.

Furthermore, there are two more failovers that happened during the initialization of recovered tasks. During the initialization of a task, it will try to recover the states from the last valid checkpoint. A FileNotFound exception happens during the recovery process. I'm not quite sure the reason. Since the recovery succeeds after two failovers, I think maybe it's because the local disks of your cluster are not stable.

Sincerely,
Zhilong

On Thu, Feb 24, 2022 at 9:04 PM Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com>> wrote:
Thanks Zhilong.

The first launch of our job is fast, I don’t think that’s the issue. I see in flink job manager log that there were several exceptions during the restart, and the task manager was restarted a few times until it was stabilized.

You can find the log here:
jobmanager-log.txt.gz<https://nokia-my.sharepoint.com/:u:/p/ifat_afek/EUsu4rb_-BpNrkpvSwzI-vgBtBO9OQlIm0CHtW0gsZ7Gqg?email=zhlonghong%40gmail.com&e=ww5Idt>

Thanks,
Ifat

From: Zhilong Hong <zh...@gmail.com>>
Date: Wednesday, 23 February 2022 at 19:38
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Flink job recovery after task manager failure

Hi, Afek!

When a TaskManager is killed, JobManager will not be acknowledged until a heartbeat timeout happens. Currently, the default value of heartbeat.timeout is 50 seconds [1]. That's why it takes more than 30 seconds for Flink to trigger a failover. If you'd like to shorten the time a failover is triggered in this situation, you could decrease the value of heartbeat.timeout in flink-conf.yaml. However, if the value is set too small, heartbeat timeouts will happen more frequently and the cluster will be unstable. As FLINK-23403 [2] mentions, if you are using Flink 1.14 or 1.15, you could try to set the value to 10s.

You mentioned that it takes 5-6 minutes to restart the jobs. It seems a bit weird. How long does it take to deploy your job for a brand new launch? You could compact and upload the log of JobManager to Google Drive or OneDrive and attach the sharing link. Maybe we can find out what happens via the log.

Sincerely,
Zhilong

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
[2] https://issues.apache.org/jira/browse/FLINK-23403

On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com>> wrote:
Hi,

I am trying to use Flink checkpoints solution in order to support task manager recovery.
I’m running flink using beam with filesystem storage and the following parameters:
checkpointingInterval=30000
checkpointingMode=EXACTLY_ONCE.

What I see is that if I kill a task manager pod, it takes flink about 30 seconds to identify the failure and another 5-6 minutes to restart the jobs.
Is there a way to shorten the downtime? What is an expected downtime in case the task manager is killed, until the jobs are recovered? Are there any best practices for handling it? (e.g. different configuration parameters)

Thanks,
Ifat


Re: Flink job recovery after task manager failure

Posted by Zhilong Hong <zh...@gmail.com>.
Hi, Afek

I've read the log you provided. Since you've set the value of
restart-strategy to be exponential-delay and the value
of restart-strategy.exponential-delay.initial-backoff is 10s, everytime a
failover is triggered, the JobManager will have to wait for 10 seconds
before it restarts the job.If you'd prefer a quicker restart, you could
change the restart strategy to fixed-delay and set a small value for
restart-strategy.fixed-delay.delay.

Furthermore, there are two more failovers that happened during the
initialization of recovered tasks. During the initialization of a task, it
will try to recover the states from the last valid checkpoint. A
FileNotFound exception happens during the recovery process. I'm not quite
sure the reason. Since the recovery succeeds after two failovers, I think
maybe it's because the local disks of your cluster are not stable.

Sincerely,
Zhilong

On Thu, Feb 24, 2022 at 9:04 PM Afek, Ifat (Nokia - IL/Kfar Sava) <
ifat.afek@nokia.com> wrote:

> Thanks Zhilong.
>
>
>
> The first launch of our job is fast, I don’t think that’s the issue. I see
> in flink job manager log that there were several exceptions during the
> restart, and the task manager was restarted a few times until it was
> stabilized.
>
>
>
> You can find the log here:
>
> jobmanager-log.txt.gz
> <https://nokia-my.sharepoint.com/:u:/p/ifat_afek/EUsu4rb_-BpNrkpvSwzI-vgBtBO9OQlIm0CHtW0gsZ7Gqg?email=zhlonghong%40gmail.com&e=ww5Idt>
>
>
>
> Thanks,
>
> Ifat
>
>
>
> *From: *Zhilong Hong <zh...@gmail.com>
> *Date: *Wednesday, 23 February 2022 at 19:38
> *To: *"Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Flink job recovery after task manager failure
>
>
>
> Hi, Afek!
>
>
>
> When a TaskManager is killed, JobManager will not be acknowledged until a
> heartbeat timeout happens. Currently, the default value of
> heartbeat.timeout is 50 seconds [1]. That's why it takes more than 30
> seconds for Flink to trigger a failover. If you'd like to shorten the time
> a failover is triggered in this situation, you could decrease the value of
> heartbeat.timeout in flink-conf.yaml. However, if the value is set too
> small, heartbeat timeouts will happen more frequently and the cluster will
> be unstable. As FLINK-23403 [2] mentions, if you are using Flink 1.14 or
> 1.15, you could try to set the value to 10s.
>
>
>
> You mentioned that it takes 5-6 minutes to restart the jobs. It seems a
> bit weird. How long does it take to deploy your job for a brand new launch?
> You could compact and upload the log of JobManager to Google Drive or
> OneDrive and attach the sharing link. Maybe we can find out what happens
> via the log.
>
>
>
> Sincerely,
>
> Zhilong
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
>
> [2] https://issues.apache.org/jira/browse/FLINK-23403
>
>
>
> On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) <
> ifat.afek@nokia.com> wrote:
>
> Hi,
>
>
>
> I am trying to use Flink checkpoints solution in order to support task
> manager recovery.
>
> I’m running flink using beam with filesystem storage and the following
> parameters:
>
> checkpointingInterval=30000
>
> checkpointingMode=EXACTLY_ONCE.
>
>
>
> What I see is that if I kill a task manager pod, it takes flink about 30
> seconds to identify the failure and another 5-6 minutes to restart the jobs.
>
> Is there a way to shorten the downtime? What is an expected downtime in
> case the task manager is killed, until the jobs are recovered? Are there
> any best practices for handling it? (e.g. different configuration
> parameters)
>
>
>
> Thanks,
>
> Ifat
>
>
>
>

Re: Flink job recovery after task manager failure

Posted by "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>.
Thanks Zhilong.

The first launch of our job is fast, I don’t think that’s the issue. I see in flink job manager log that there were several exceptions during the restart, and the task manager was restarted a few times until it was stabilized.

You can find the log here:
jobmanager-log.txt.gz<https://nokia-my.sharepoint.com/:u:/p/ifat_afek/EUsu4rb_-BpNrkpvSwzI-vgBtBO9OQlIm0CHtW0gsZ7Gqg?email=zhlonghong%40gmail.com&e=ww5Idt>

Thanks,
Ifat

From: Zhilong Hong <zh...@gmail.com>
Date: Wednesday, 23 February 2022 at 19:38
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" <if...@nokia.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Flink job recovery after task manager failure

Hi, Afek!

When a TaskManager is killed, JobManager will not be acknowledged until a heartbeat timeout happens. Currently, the default value of heartbeat.timeout is 50 seconds [1]. That's why it takes more than 30 seconds for Flink to trigger a failover. If you'd like to shorten the time a failover is triggered in this situation, you could decrease the value of heartbeat.timeout in flink-conf.yaml. However, if the value is set too small, heartbeat timeouts will happen more frequently and the cluster will be unstable. As FLINK-23403 [2] mentions, if you are using Flink 1.14 or 1.15, you could try to set the value to 10s.

You mentioned that it takes 5-6 minutes to restart the jobs. It seems a bit weird. How long does it take to deploy your job for a brand new launch? You could compact and upload the log of JobManager to Google Drive or OneDrive and attach the sharing link. Maybe we can find out what happens via the log.

Sincerely,
Zhilong

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
[2] https://issues.apache.org/jira/browse/FLINK-23403

On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) <if...@nokia.com>> wrote:
Hi,

I am trying to use Flink checkpoints solution in order to support task manager recovery.
I’m running flink using beam with filesystem storage and the following parameters:
checkpointingInterval=30000
checkpointingMode=EXACTLY_ONCE.

What I see is that if I kill a task manager pod, it takes flink about 30 seconds to identify the failure and another 5-6 minutes to restart the jobs.
Is there a way to shorten the downtime? What is an expected downtime in case the task manager is killed, until the jobs are recovered? Are there any best practices for handling it? (e.g. different configuration parameters)

Thanks,
Ifat


Re: Flink job recovery after task manager failure

Posted by Zhilong Hong <zh...@gmail.com>.
Hi, Afek!

When a TaskManager is killed, JobManager will not be acknowledged until a
heartbeat timeout happens. Currently, the default value of
heartbeat.timeout is 50 seconds [1]. That's why it takes more than 30
seconds for Flink to trigger a failover. If you'd like to shorten the time
a failover is triggered in this situation, you could decrease the value of
heartbeat.timeout in flink-conf.yaml. However, if the value is set too
small, heartbeat timeouts will happen more frequently and the cluster will
be unstable. As FLINK-23403 [2] mentions, if you are using Flink 1.14 or
1.15, you could try to set the value to 10s.

You mentioned that it takes 5-6 minutes to restart the jobs. It seems a bit
weird. How long does it take to deploy your job for a brand new launch? You
could compact and upload the log of JobManager to Google Drive or OneDrive
and attach the sharing link. Maybe we can find out what happens via the log.

Sincerely,
Zhilong

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
[2] https://issues.apache.org/jira/browse/FLINK-23403

On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) <
ifat.afek@nokia.com> wrote:

> Hi,
>
>
>
> I am trying to use Flink checkpoints solution in order to support task
> manager recovery.
>
> I’m running flink using beam with filesystem storage and the following
> parameters:
>
> checkpointingInterval=30000
>
> checkpointingMode=EXACTLY_ONCE.
>
>
>
> What I see is that if I kill a task manager pod, it takes flink about 30
> seconds to identify the failure and another 5-6 minutes to restart the jobs.
>
> Is there a way to shorten the downtime? What is an expected downtime in
> case the task manager is killed, until the jobs are recovered? Are there
> any best practices for handling it? (e.g. different configuration
> parameters)
>
>
>
> Thanks,
>
> Ifat
>
>
>