You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Matthias Pohl (Jira)" <ji...@apache.org> on 2022/05/18 13:43:00 UTC

[jira] [Comment Edited] (FLINK-27569) Terminated Flink job restarted from empty state when execution.shutdown-on-application-finish is false

    [ https://issues.apache.org/jira/browse/FLINK-27569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17538829#comment-17538829 ] 

Matthias Pohl edited comment on FLINK-27569 at 5/18/22 1:42 PM:
----------------------------------------------------------------

Hi [~gyfora], thanks for your patience. I was off the keyboard for two weeks. Thanks for jumping in [~wangyang0918]. As far as I understand, you're trying to avoid the job to be restarted after a JobManager failover after the job terminated in an Flink cluster run in application mode. That's the exact use-case for which {{job-result-store.delete-on-commit}} was introduced (see FLINK-11813).

The application cluster failover requires a JRS entry to be around to know whether the job was already started and finished (and, therefore, doesn't need to be resubmitted). In that case, the user takes over the responsibility to clean up the JobResultStore entry as [~wangyang0918] pointed out. So far, it doesn't look like we're observing unexpected behavior here.

[~gyfora] may you confirm my observation or do I miss something?


was (Author: mapohl):
Hi [~gyfora], thanks for your patience. I was off the keyboard for two weeks. Thanks for jumping in [~wangyang0918]. As far as I understand, you're trying to avoid the job to be restarted after a JobManager failover after the job terminated in an Application Cluster. That's the exact use-case for which {{job-result-store.delete-on-commit}} was introduced (see FLINK-11813).

The application cluster failover requires a JRS entry to be around to know whether the job was already started and finished (and, therefore, doesn't need to be resubmitted). In that case, the user takes over the responsibility to clean up the JobResultStore entry as [~wangyang0918] pointed out. So far, it doesn't look like we're observing unexpected behavior here.

[~gyfora] may you confirm my observation or do I miss something?

> Terminated Flink job restarted from empty state when execution.shutdown-on-application-finish is false
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27569
>                 URL: https://issues.apache.org/jira/browse/FLINK-27569
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes, Runtime / Checkpointing
>    Affects Versions: 1.15.0
>            Reporter: Gyula Fora
>            Priority: Critical
>         Attachments: Screenshot 2022-05-11 at 08.46.51.png, Screenshot 2022-05-11 at 08.50.03.png
>
>
> When Jobmanager HA is enabled and execution.shutdown-on-application-finish = false, terminated jobs (failed, cancelled etc) will be resubmitted from a compeltely empty state on jobmanager failover.
> Please see the following situation. Flink 1.15, HA enabled, shutdown on app finish off:
> 1. Submit Flink application cluster
> 2. Call cancel with savepoint -> see logs below
> job succesfully finishes with savepoint
> {noformat}
> 2022-05-11 06:42:48,562 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 00000000000000000000000000000000 reached terminal state FINISHED.
> 2022-05-11 06:42:48,624 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 00000000000000000000000000000000 has been registered for cleanup in the JobResultStore after reaching a terminal state.
> 2022-05-11 06:42:48,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping the JobMaster for job 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:42:48,629 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Shutting down
> 2022-05-11 06:42:48,647 INFO  org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] - Shutting down.
> 2022-05-11 06:42:48,647 INFO  org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] - Removing counter from ConfigMap basic-checkpoint-ha-example-00000000000000000000000000000000-config-map
> 2022-05-11 06:42:48,652 INFO  org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [0cdb18eefcb2133049223214d4716fa0].
> 2022-05-11 06:42:48,653 INFO  org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [bf5ece74692d786f6ba2b067c76ee1d9].
> 2022-05-11 06:42:48,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Close ResourceManager connection 220ea961c86ea8042fde2151fd05a5c9: Stopping JobMaster for job 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:42:48,653 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
> 2022-05-11 06:42:48,653 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,655 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:9a1bc36b-6a76-4970-96a0-945e9a12b66d
> 2022-05-11 06:42:48,655 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
> 2022-05-11 06:42:48,655 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,655 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:5facec4c-d888-43b4-88d0-d1f34912d35a
> 2022-05-11 06:42:48,655 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Disconnect job manager 969eeac09f5cf4813103003495204620@akka.tcp://flink@172.17.0.6:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000 from the resource manager.
> 2022-05-11 06:42:48,660 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
> 2022-05-11 06:42:48,723 INFO  org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices [] - Clean up the high availability data for job 00000000000000000000000000000000.
> 2022-05-11 06:42:48,753 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed job graph 00000000000000000000000000000000 from KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,758 INFO  org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices [] - Finished cleaning up the high availability data for job 00000000000000000000000000000000.
> 2022-05-11 06:42:50,321 INFO  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application completed SUCCESSFULLY{noformat}
> !Screenshot 2022-05-11 at 08.46.51.png|width=882,height=106!
> 3. Trigger JobManager failover
> Jobmanager recovers, but resubmits job from empty state:
> {noformat}
> 2022-05-11 06:48:04,535 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 00000000000000000000000000000000 is submitted.
> 2022-05-11 06:48:04,535 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=00000000000000000000000000000000.
> 2022-05-11 06:48:04,629 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 pods from previous attempts, current attempt id is 1.
> 2022-05-11 06:48:04,629 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt.
> 2022-05-11 06:48:04,650 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received JobGraph submission 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:48:04,652 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting job 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:48:04,746 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Added JobGraph(jobId: 00000000000000000000000000000000) to KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:48:04,826 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@370a1b27.
> 2022-05-11 06:48:04,838 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 .
> 2022-05-11 06:48:04,843 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:48:04,926 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for State machine job (00000000000000000000000000000000).
> 2022-05-11 06:48:04,955 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}.
> 2022-05-11 06:48:04,959 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 0 checkpoints in KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}.
> 2022-05-11 06:48:04,959 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to fetch 0 checkpoints from storage.
> 2022-05-11 06:48:04,974 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job State machine job (00000000000000000000000000000000).
> 2022-05-11 06:48:04,974 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.
> 2022-05-11 06:48:05,032 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
> 2022-05-11 06:48:05,035 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@670a312c
> 2022-05-11 06:48:05,035 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
> 2022-05-11 06:48:05,036 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Checkpoint storage is set to 'jobmanager'
> 2022-05-11 06:48:05,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
> 2022-05-11 06:48:05,058 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@bdfaf5f for State machine job (00000000000000000000000000000000).
> 2022-05-11 06:48:05,065 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:48:05,066 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:83411d91-3094-46c5-b2cc-0576bf5cc161
> 2022-05-11 06:48:05,126 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job 'State machine job' (00000000000000000000000000000000) under job master id 9c63401786b3856e5c8a0cf069e44198.
> 2022-05-11 06:48:05,132 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
> {noformat}
> !Screenshot 2022-05-11 at 08.50.03.png|width=861,height=79!
>  
> In addition, checkpoint history is also lost (which is probably the main cause of the issue)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)