You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexey Trenikhun <ye...@msn.com> on 2021/12/11 05:46:42 UTC

Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

Hello,
I'm running Flink 1.13.3 with Kubernetes HA. JM periodically restarts after some time, in log below job runs ~8 minutes, then suddenly leadership was revoked, job reaches terminal state and K8s restarts failed JM:

{"timestamp":"2021-12-11T04:51:53.697Z","message":"Agent Info (1/1) (47e6706e52ad96111a3d722cc56b5752) switched from INITIALIZING to RUNNING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.483Z","message":"ResourceManager akka.tcp://flink@10.244.104.239:6123/user/rpc/resourcemanager_0 was revoked leadership. Clearing fencing token.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping DefaultLeaderRetrievalService.","logger_name":"org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping KubernetesLeaderRetrievalDriver{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.485Z","message":"The watcher is closing.","logger_name":"org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.487Z","message":"Suspending the slot manager.","logger_name":"org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.555Z","message":"DefaultDispatcherRunner was revoked the leadership with leader id 138b4029-88eb-409f-98cc-e296fe400eb8. Stopping the DispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.556Z","message":"Stopping SessionDispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.557Z","message":"Stopping dispatcher akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.558Z","message":"Stopping all currently running jobs of dispatcher akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.560Z","message":"Stopping the JobMaster for job gim(00000000000000000000000000000000).","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.565Z","message":"Job gim (00000000000000000000000000000000) switched from state RUNNING to SUSPENDED.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000,"stack_trace":"org.apache.flink.util.FlinkException: Scheduler is being stopped.\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:607)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398)\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n"}
{"timestamp":"2021-12-11T05:06:10.566Z","message":"Job 00000000000000000000000000000000 reached terminal state SUSPENDED.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
...

TM has no errors, till JM starting to cancel tasks:

{"timestamp":"2021-12-11T05:05:50.681Z","message":"Source: voice-callthread, idle: {}","logger_name":"gim.fsp.util.flink.kafka.GKafkaFetcher","thread_name":"g-event-time-alignment-ts-0","level":"INFO","level_value":20000,"operator_name":"Source: voice-callthread"}
{"timestamp":"2021-12-11T05:06:10.574Z","message":"Attempting to cancel task Source: heartbeat -> Sink: heartbeat (1/1)#1 (858bac81394274fbff76471f502b30d0).","logger_name":"org.apache.flink.runtime.taskmanager.Task","thread_name":"flink-akka.actor.default-dispatcher-13","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.575Z","message":"Source: heartbeat -> Sink: heartbeat (1/1)#1 (858bac81394274fbff76471f502b30d0) switched from RUNNING to CANCELING.","logger_name":"org.apache.flink.runtime.taskmanager.Task","thread_name":"flink-akka.actor.default-dispatcher-13","level":"INFO","level_value":20000}

What is causing leadership revocation ?

Thanks,
Alexey

Re: Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

Posted by Alexey Trenikhun <ye...@msn.com>.
Doesn't look like JM has long GC, if I understand log correctly - [Times: user=0.03 sys=0.00, real=0.03 secs] and GC was minutes before ResourceManager was revoked leadership  (log is attached)

Thanks,
Alexey
________________________________
From: Yang Wang <da...@gmail.com>
Sent: Wednesday, December 15, 2021 6:50 PM
To: Alexey Trenikhun <ye...@msn.com>
Cc: David Morávek <dm...@apache.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

Could you please check whether the JobManager has a long fullGC, which will cause the leadership lost?

BTW, increasing the timeout should help.

high-availability.kubernetes.leader-election.lease-duration: 60s
high-availability.kubernetes.leader-election.renew-deadline: 60s

Best,
Yang


Alexey Trenikhun <ye...@msn.com>> 于2021年12月14日周二 05:36写道:
Hi David,

Setup is application mode, single job, single JM (Kubernetes job), k8s v1.18.2. I'm attaching JM log.


Thanks,
Alexey
________________________________
From: David Morávek <dm...@apache.org>>
Sent: Monday, December 13, 2021 12:59 AM
To: Alexey Trenikhun <ye...@msn.com>>
Cc: Flink User Mail List <us...@flink.apache.org>>
Subject: Re: Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

Hi Alexey,

please be aware that the json-based logs in the mail may not make it pass the spam filter (at least for gmail they did not) :(

K8s based leader election is based on optimistic locking of the underlying config-map (~ periodically updating the lease annotation of the config-map). If JM fails to update this lease within a deadline, the leadership is lost.

Can you please elaborate a bit about your setup and your k8s related Flink configurations? Also could you share the whole JM log by any chance (gist / email attachment)?

Best,
D.

On Sat, Dec 11, 2021 at 6:47 AM Alexey Trenikhun <ye...@msn.com>> wrote:
Hello,
I'm running Flink 1.13.3 with Kubernetes HA. JM periodically restarts after some time, in log below job runs ~8 minutes, then suddenly leadership was revoked, job reaches terminal state and K8s restarts failed JM:

{"timestamp":"2021-12-11T04:51:53.697Z","message":"Agent Info (1/1) (47e6706e52ad96111a3d722cc56b5752) switched from INITIALIZING to RUNNING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.483Z","message":"ResourceManager akka.tcp://flink@10.244.104.239:6123/user/rpc/resourcemanager_0<http://flink@10.244.104.239:6123/user/rpc/resourcemanager_0> was revoked leadership. Clearing fencing token.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping DefaultLeaderRetrievalService.","logger_name":"org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping KubernetesLeaderRetrievalDriver{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.485Z","message":"The watcher is closing.","logger_name":"org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.487Z","message":"Suspending the slot manager.","logger_name":"org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.555Z","message":"DefaultDispatcherRunner was revoked the leadership with leader id 138b4029-88eb-409f-98cc-e296fe400eb8. Stopping the DispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.556Z","message":"Stopping SessionDispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.557Z","message":"Stopping dispatcher akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1<http://flink@10.244.104.239:6123/user/rpc/dispatcher_1>.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.558Z","message":"Stopping all currently running jobs of dispatcher akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1<http://flink@10.244.104.239:6123/user/rpc/dispatcher_1>.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.560Z","message":"Stopping the JobMaster for job gim(00000000000000000000000000000000).","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.565Z","message":"Job gim (00000000000000000000000000000000) switched from state RUNNING to SUSPENDED.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000,"stack_trace":"org.apache.flink.util.FlinkException: Scheduler is being stopped.\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:607)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398)\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n"}
{"timestamp":"2021-12-11T05:06:10.566Z","message":"Job 00000000000000000000000000000000 reached terminal state SUSPENDED.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
...

TM has no errors, till JM starting to cancel tasks:

{"timestamp":"2021-12-11T05:05:50.681Z","message":"Source: voice-callthread, idle: {}","logger_name":"gim.fsp.util.flink.kafka.GKafkaFetcher","thread_name":"g-event-time-alignment-ts-0","level":"INFO","level_value":20000,"operator_name":"Source: voice-callthread"}
{"timestamp":"2021-12-11T05:06:10.574Z","message":"Attempting to cancel task Source: heartbeat -> Sink: heartbeat (1/1)#1 (858bac81394274fbff76471f502b30d0).","logger_name":"org.apache.flink.runtime.taskmanager.Task","thread_name":"flink-akka.actor.default-dispatcher-13","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.575Z","message":"Source: heartbeat -> Sink: heartbeat (1/1)#1 (858bac81394274fbff76471f502b30d0) switched from RUNNING to CANCELING.","logger_name":"org.apache.flink.runtime.taskmanager.Task","thread_name":"flink-akka.actor.default-dispatcher-13","level":"INFO","level_value":20000}

What is causing leadership revocation ?

Thanks,
Alexey

Re: Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

Posted by Yang Wang <da...@gmail.com>.
Could you please check whether the JobManager has a long fullGC, which will
cause the leadership lost?

BTW, increasing the timeout should help.

high-availability.kubernetes.leader-election.lease-duration: 60s
high-availability.kubernetes.leader-election.renew-deadline: 60s

Best,
Yang


Alexey Trenikhun <ye...@msn.com> 于2021年12月14日周二 05:36写道:

> Hi David,
>
> Setup is application mode, single job, single JM (Kubernetes job), k8s
> v1.18.2. I'm attaching JM log.
>
>
> Thanks,
> Alexey
> ------------------------------
> *From:* David Morávek <dm...@apache.org>
> *Sent:* Monday, December 13, 2021 12:59 AM
> *To:* Alexey Trenikhun <ye...@msn.com>
> *Cc:* Flink User Mail List <us...@flink.apache.org>
> *Subject:* Re: Flink 1.13.3, k8s HA - ResourceManager was revoked
> leadership
>
> Hi Alexey,
>
> please be aware that the json-based logs in the mail may not make it pass
> the spam filter (at least for gmail they did not) :(
>
> K8s based leader election is based on optimistic locking of the underlying
> config-map (~ periodically updating the lease annotation of the
> config-map). If JM fails to update this lease within a deadline, the
> leadership is lost.
>
> Can you please elaborate a bit about your setup and your k8s related Flink
> configurations? Also could you share the whole JM log by any chance (gist /
> email attachment)?
>
> Best,
> D.
>
> On Sat, Dec 11, 2021 at 6:47 AM Alexey Trenikhun <ye...@msn.com> wrote:
>
> Hello,
> I'm running Flink 1.13.3 with Kubernetes HA. JM periodically restarts
> after some time, in log below job runs ~8 minutes, then suddenly leadership
> was revoked, job reaches terminal state and K8s restarts failed JM:
>
> {"timestamp":"2021-12-11T04:51:53.697Z","message":"Agent Info (1/1)
> (47e6706e52ad96111a3d722cc56b5752) switched from INITIALIZING to
> RUNNING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
>
> {"timestamp":"2021-12-11T05:06:10.483Z","message":"ResourceManager
> akka.tcp://flink@10.244.104.239:6123/user/rpc/resourcemanager_0 was
> revoked leadership. Clearing fencing
> token.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping
> DefaultLeaderRetrievalService.","logger_name":"org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping
> KubernetesLeaderRetrievalDriver{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.485Z","message":"The watcher is
> closing.","logger_name":"org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.487Z","message":"Suspending the slot
> manager.","logger_name":"org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.555Z","message":"DefaultDispatcherRunner
> was revoked the leadership with leader id
> 138b4029-88eb-409f-98cc-e296fe400eb8. Stopping the
> DispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.556Z","message":"Stopping
> SessionDispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.557Z","message":"Stopping dispatcher
> akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1
> .","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.558Z","message":"Stopping all currently
> running jobs of dispatcher akka.tcp://
> flink@10.244.104.239:6123/user/rpc/dispatcher_1
> .","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.560Z","message":"Stopping the JobMaster
> for job
> gim(00000000000000000000000000000000).","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.565Z","message":"Job gim
> (00000000000000000000000000000000) switched from state RUNNING to
> SUSPENDED.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000,"stack_trace":"org.apache.flink.util.FlinkException:
> Scheduler is being stopped.\n\tat
> org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:607)\n\tat
> org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962)\n\tat
> org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926)\n\tat
> org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398)\n\tat
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat
> akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat
> akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat
> akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n"}
> {"timestamp":"2021-12-11T05:06:10.566Z","message":"Job
> 00000000000000000000000000000000 reached terminal state
> SUSPENDED.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> ...
>
> TM has no errors, till JM starting to cancel tasks:
>
> {"timestamp":"2021-12-11T05:05:50.681Z","message":"Source:
> voice-callthread, idle:
> {}","logger_name":"gim.fsp.util.flink.kafka.GKafkaFetcher","thread_name":"g-event-time-alignment-ts-0","level":"INFO","level_value":20000,"operator_name":"Source:
> voice-callthread"}
> {"timestamp":"2021-12-11T05:06:10.574Z","message":"Attempting to cancel
> task Source: heartbeat -> Sink: heartbeat (1/1)#1
> (858bac81394274fbff76471f502b30d0).","logger_name":"org.apache.flink.runtime.taskmanager.Task","thread_name":"flink-akka.actor.default-dispatcher-13","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.575Z","message":"Source: heartbeat ->
> Sink: heartbeat (1/1)#1 (858bac81394274fbff76471f502b30d0) switched from
> RUNNING to
> CANCELING.","logger_name":"org.apache.flink.runtime.taskmanager.Task","thread_name":"flink-akka.actor.default-dispatcher-13","level":"INFO","level_value":20000}
>
> What is causing leadership revocation ?
>
> Thanks,
> Alexey
>
>

Re: Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

Posted by Alexey Trenikhun <ye...@msn.com>.
Hi David,

Setup is application mode, single job, single JM (Kubernetes job), k8s v1.18.2. I'm attaching JM log.


Thanks,
Alexey
________________________________
From: David Morávek <dm...@apache.org>
Sent: Monday, December 13, 2021 12:59 AM
To: Alexey Trenikhun <ye...@msn.com>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

Hi Alexey,

please be aware that the json-based logs in the mail may not make it pass the spam filter (at least for gmail they did not) :(

K8s based leader election is based on optimistic locking of the underlying config-map (~ periodically updating the lease annotation of the config-map). If JM fails to update this lease within a deadline, the leadership is lost.

Can you please elaborate a bit about your setup and your k8s related Flink configurations? Also could you share the whole JM log by any chance (gist / email attachment)?

Best,
D.

On Sat, Dec 11, 2021 at 6:47 AM Alexey Trenikhun <ye...@msn.com>> wrote:
Hello,
I'm running Flink 1.13.3 with Kubernetes HA. JM periodically restarts after some time, in log below job runs ~8 minutes, then suddenly leadership was revoked, job reaches terminal state and K8s restarts failed JM:

{"timestamp":"2021-12-11T04:51:53.697Z","message":"Agent Info (1/1) (47e6706e52ad96111a3d722cc56b5752) switched from INITIALIZING to RUNNING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.483Z","message":"ResourceManager akka.tcp://flink@10.244.104.239:6123/user/rpc/resourcemanager_0<http://flink@10.244.104.239:6123/user/rpc/resourcemanager_0> was revoked leadership. Clearing fencing token.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping DefaultLeaderRetrievalService.","logger_name":"org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping KubernetesLeaderRetrievalDriver{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.485Z","message":"The watcher is closing.","logger_name":"org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.487Z","message":"Suspending the slot manager.","logger_name":"org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.555Z","message":"DefaultDispatcherRunner was revoked the leadership with leader id 138b4029-88eb-409f-98cc-e296fe400eb8. Stopping the DispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.556Z","message":"Stopping SessionDispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.557Z","message":"Stopping dispatcher akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1<http://flink@10.244.104.239:6123/user/rpc/dispatcher_1>.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.558Z","message":"Stopping all currently running jobs of dispatcher akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1<http://flink@10.244.104.239:6123/user/rpc/dispatcher_1>.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.560Z","message":"Stopping the JobMaster for job gim(00000000000000000000000000000000).","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.565Z","message":"Job gim (00000000000000000000000000000000) switched from state RUNNING to SUSPENDED.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000,"stack_trace":"org.apache.flink.util.FlinkException: Scheduler is being stopped.\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:607)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398)\n\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n"}
{"timestamp":"2021-12-11T05:06:10.566Z","message":"Job 00000000000000000000000000000000 reached terminal state SUSPENDED.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
...

TM has no errors, till JM starting to cancel tasks:

{"timestamp":"2021-12-11T05:05:50.681Z","message":"Source: voice-callthread, idle: {}","logger_name":"gim.fsp.util.flink.kafka.GKafkaFetcher","thread_name":"g-event-time-alignment-ts-0","level":"INFO","level_value":20000,"operator_name":"Source: voice-callthread"}
{"timestamp":"2021-12-11T05:06:10.574Z","message":"Attempting to cancel task Source: heartbeat -> Sink: heartbeat (1/1)#1 (858bac81394274fbff76471f502b30d0).","logger_name":"org.apache.flink.runtime.taskmanager.Task","thread_name":"flink-akka.actor.default-dispatcher-13","level":"INFO","level_value":20000}
{"timestamp":"2021-12-11T05:06:10.575Z","message":"Source: heartbeat -> Sink: heartbeat (1/1)#1 (858bac81394274fbff76471f502b30d0) switched from RUNNING to CANCELING.","logger_name":"org.apache.flink.runtime.taskmanager.Task","thread_name":"flink-akka.actor.default-dispatcher-13","level":"INFO","level_value":20000}

What is causing leadership revocation ?

Thanks,
Alexey

Re: Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

Posted by David Morávek <dm...@apache.org>.
Hi Alexey,

please be aware that the json-based logs in the mail may not make it pass
the spam filter (at least for gmail they did not) :(

K8s based leader election is based on optimistic locking of the underlying
config-map (~ periodically updating the lease annotation of the
config-map). If JM fails to update this lease within a deadline, the
leadership is lost.

Can you please elaborate a bit about your setup and your k8s related Flink
configurations? Also could you share the whole JM log by any chance (gist /
email attachment)?

Best,
D.

On Sat, Dec 11, 2021 at 6:47 AM Alexey Trenikhun <ye...@msn.com> wrote:

> Hello,
> I'm running Flink 1.13.3 with Kubernetes HA. JM periodically restarts
> after some time, in log below job runs ~8 minutes, then suddenly leadership
> was revoked, job reaches terminal state and K8s restarts failed JM:
>
> {"timestamp":"2021-12-11T04:51:53.697Z","message":"Agent Info (1/1)
> (47e6706e52ad96111a3d722cc56b5752) switched from INITIALIZING to
> RUNNING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
>
> {"timestamp":"2021-12-11T05:06:10.483Z","message":"ResourceManager
> akka.tcp://flink@10.244.104.239:6123/user/rpc/resourcemanager_0 was
> revoked leadership. Clearing fencing
> token.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping
> DefaultLeaderRetrievalService.","logger_name":"org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping
> KubernetesLeaderRetrievalDriver{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.485Z","message":"The watcher is
> closing.","logger_name":"org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.487Z","message":"Suspending the slot
> manager.","logger_name":"org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.555Z","message":"DefaultDispatcherRunner
> was revoked the leadership with leader id
> 138b4029-88eb-409f-98cc-e296fe400eb8. Stopping the
> DispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.556Z","message":"Stopping
> SessionDispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.557Z","message":"Stopping dispatcher
> akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1
> .","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.558Z","message":"Stopping all currently
> running jobs of dispatcher akka.tcp://
> flink@10.244.104.239:6123/user/rpc/dispatcher_1
> .","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.560Z","message":"Stopping the JobMaster
> for job
> gim(00000000000000000000000000000000).","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.565Z","message":"Job gim
> (00000000000000000000000000000000) switched from state RUNNING to
> SUSPENDED.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000,"stack_trace":"org.apache.flink.util.FlinkException:
> Scheduler is being stopped.\n\tat
> org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:607)\n\tat
> org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962)\n\tat
> org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926)\n\tat
> org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398)\n\tat
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat
> akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat
> akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat
> akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n"}
> {"timestamp":"2021-12-11T05:06:10.566Z","message":"Job
> 00000000000000000000000000000000 reached terminal state
> SUSPENDED.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":20000}
> ...
>
> TM has no errors, till JM starting to cancel tasks:
>
> {"timestamp":"2021-12-11T05:05:50.681Z","message":"Source:
> voice-callthread, idle:
> {}","logger_name":"gim.fsp.util.flink.kafka.GKafkaFetcher","thread_name":"g-event-time-alignment-ts-0","level":"INFO","level_value":20000,"operator_name":"Source:
> voice-callthread"}
> {"timestamp":"2021-12-11T05:06:10.574Z","message":"Attempting to cancel
> task Source: heartbeat -> Sink: heartbeat (1/1)#1
> (858bac81394274fbff76471f502b30d0).","logger_name":"org.apache.flink.runtime.taskmanager.Task","thread_name":"flink-akka.actor.default-dispatcher-13","level":"INFO","level_value":20000}
> {"timestamp":"2021-12-11T05:06:10.575Z","message":"Source: heartbeat ->
> Sink: heartbeat (1/1)#1 (858bac81394274fbff76471f502b30d0) switched from
> RUNNING to
> CANCELING.","logger_name":"org.apache.flink.runtime.taskmanager.Task","thread_name":"flink-akka.actor.default-dispatcher-13","level":"INFO","level_value":20000}
>
> What is causing leadership revocation ?
>
> Thanks,
> Alexey
>