You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Matthias Pohl (Jira)" <ji...@apache.org> on 2022/05/18 13:43:00 UTC
[jira] [Comment Edited] (FLINK-27569) Terminated Flink job restarted from empty state when execution.shutdown-on-application-finish is false
[ https://issues.apache.org/jira/browse/FLINK-27569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17538829#comment-17538829 ]
Matthias Pohl edited comment on FLINK-27569 at 5/18/22 1:42 PM:
----------------------------------------------------------------
Hi [~gyfora], thanks for your patience. I was off the keyboard for two weeks. Thanks for jumping in [~wangyang0918]. As far as I understand, you're trying to avoid the job to be restarted after a JobManager failover after the job terminated in an Flink cluster run in application mode. That's the exact use-case for which {{job-result-store.delete-on-commit}} was introduced (see FLINK-11813).
The application cluster failover requires a JRS entry to be around to know whether the job was already started and finished (and, therefore, doesn't need to be resubmitted). In that case, the user takes over the responsibility to clean up the JobResultStore entry as [~wangyang0918] pointed out. So far, it doesn't look like we're observing unexpected behavior here.
[~gyfora] may you confirm my observation or do I miss something?
was (Author: mapohl):
Hi [~gyfora], thanks for your patience. I was off the keyboard for two weeks. Thanks for jumping in [~wangyang0918]. As far as I understand, you're trying to avoid the job to be restarted after a JobManager failover after the job terminated in an Application Cluster. That's the exact use-case for which {{job-result-store.delete-on-commit}} was introduced (see FLINK-11813).
The application cluster failover requires a JRS entry to be around to know whether the job was already started and finished (and, therefore, doesn't need to be resubmitted). In that case, the user takes over the responsibility to clean up the JobResultStore entry as [~wangyang0918] pointed out. So far, it doesn't look like we're observing unexpected behavior here.
[~gyfora] may you confirm my observation or do I miss something?
> Terminated Flink job restarted from empty state when execution.shutdown-on-application-finish is false
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-27569
> URL: https://issues.apache.org/jira/browse/FLINK-27569
> Project: Flink
> Issue Type: Bug
> Components: Deployment / Kubernetes, Runtime / Checkpointing
> Affects Versions: 1.15.0
> Reporter: Gyula Fora
> Priority: Critical
> Attachments: Screenshot 2022-05-11 at 08.46.51.png, Screenshot 2022-05-11 at 08.50.03.png
>
>
> When Jobmanager HA is enabled and execution.shutdown-on-application-finish = false, terminated jobs (failed, cancelled etc) will be resubmitted from a compeltely empty state on jobmanager failover.
> Please see the following situation. Flink 1.15, HA enabled, shutdown on app finish off:
> 1. Submit Flink application cluster
> 2. Call cancel with savepoint -> see logs below
> job succesfully finishes with savepoint
> {noformat}
> 2022-05-11 06:42:48,562 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 00000000000000000000000000000000 reached terminal state FINISHED.
> 2022-05-11 06:42:48,624 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 00000000000000000000000000000000 has been registered for cleanup in the JobResultStore after reaching a terminal state.
> 2022-05-11 06:42:48,626 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:42:48,629 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Shutting down
> 2022-05-11 06:42:48,647 INFO org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] - Shutting down.
> 2022-05-11 06:42:48,647 INFO org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] - Removing counter from ConfigMap basic-checkpoint-ha-example-00000000000000000000000000000000-config-map
> 2022-05-11 06:42:48,652 INFO org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [0cdb18eefcb2133049223214d4716fa0].
> 2022-05-11 06:42:48,653 INFO org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [bf5ece74692d786f6ba2b067c76ee1d9].
> 2022-05-11 06:42:48,653 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection 220ea961c86ea8042fde2151fd05a5c9: Stopping JobMaster for job 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:42:48,653 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
> 2022-05-11 06:42:48,653 INFO org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,655 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:9a1bc36b-6a76-4970-96a0-945e9a12b66d
> 2022-05-11 06:42:48,655 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
> 2022-05-11 06:42:48,655 INFO org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,655 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:5facec4c-d888-43b4-88d0-d1f34912d35a
> 2022-05-11 06:42:48,655 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Disconnect job manager 969eeac09f5cf4813103003495204620@akka.tcp://flink@172.17.0.6:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000 from the resource manager.
> 2022-05-11 06:42:48,660 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
> 2022-05-11 06:42:48,723 INFO org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices [] - Clean up the high availability data for job 00000000000000000000000000000000.
> 2022-05-11 06:42:48,753 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed job graph 00000000000000000000000000000000 from KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,758 INFO org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices [] - Finished cleaning up the high availability data for job 00000000000000000000000000000000.
> 2022-05-11 06:42:50,321 INFO org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application completed SUCCESSFULLY{noformat}
> !Screenshot 2022-05-11 at 08.46.51.png|width=882,height=106!
> 3. Trigger JobManager failover
> Jobmanager recovers, but resubmits job from empty state:
> {noformat}
> 2022-05-11 06:48:04,535 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 00000000000000000000000000000000 is submitted.
> 2022-05-11 06:48:04,535 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=00000000000000000000000000000000.
> 2022-05-11 06:48:04,629 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0 pods from previous attempts, current attempt id is 1.
> 2022-05-11 06:48:04,629 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt.
> 2022-05-11 06:48:04,650 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:48:04,652 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:48:04,746 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Added JobGraph(jobId: 00000000000000000000000000000000) to KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:48:04,826 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@370a1b27.
> 2022-05-11 06:48:04,838 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 .
> 2022-05-11 06:48:04,843 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:48:04,926 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for State machine job (00000000000000000000000000000000).
> 2022-05-11 06:48:04,955 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}.
> 2022-05-11 06:48:04,959 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 0 checkpoints in KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}.
> 2022-05-11 06:48:04,959 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to fetch 0 checkpoints from storage.
> 2022-05-11 06:48:04,974 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job State machine job (00000000000000000000000000000000).
> 2022-05-11 06:48:04,974 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms.
> 2022-05-11 06:48:05,032 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
> 2022-05-11 06:48:05,035 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@670a312c
> 2022-05-11 06:48:05,035 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend
> 2022-05-11 06:48:05,036 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to 'jobmanager'
> 2022-05-11 06:48:05,053 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint found during restore.
> 2022-05-11 06:48:05,058 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@bdfaf5f for State machine job (00000000000000000000000000000000).
> 2022-05-11 06:48:05,065 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:48:05,066 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:83411d91-3094-46c5-b2cc-0576bf5cc161
> 2022-05-11 06:48:05,126 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of job 'State machine job' (00000000000000000000000000000000) under job master id 9c63401786b3856e5c8a0cf069e44198.
> 2022-05-11 06:48:05,132 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
> {noformat}
> !Screenshot 2022-05-11 at 08.50.03.png|width=861,height=79!
>
> In addition, checkpoint history is also lost (which is probably the main cause of the issue)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)