You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nicholas Jiang (Jira)" <ji...@apache.org> on 2022/05/18 01:45:00 UTC

[jira] [Comment Edited] (FLINK-27257) Flink kubernetes operator triggers savepoint failed because of not all tasks running

    [ https://issues.apache.org/jira/browse/FLINK-27257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17538539#comment-17538539 ] 

Nicholas Jiang edited comment on FLINK-27257 at 5/18/22 1:44 AM:
-----------------------------------------------------------------

I have offline discussed with [~wangyang0918]. It's hard to improve the isJobRunning implementation because the returned JobStatusMessages of ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, which cause that there is no way to judge whether all the ExecutionState of tasks are running in the job and for batch tasks how to judge whether the job is really running.

The current idea is that if the error is found to be not all required tasks are currently running, then continue to trigger savepoint in the next reconciliation until it is successfully triggered.

[~gyfora][~wangyang0918] WDYT?


was (Author: nicholasjiang):
I have offline discussed with [~wangyang0918]. It's hard to improve the isJobRunning implementation because the returned JobStatusMessages of ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, which cause that there is no way to judge whether all the ExecutionState of tasks are running in the job and for batch tasks how to judge whether the job is really running.

The current idea is that if the error is found to be Not all required tasks are currently running, then continue to trigger savepoint in the next reconciliation until it is successfully triggered.

[~gyfora][~wangyang0918] WDYT?

> Flink kubernetes operator triggers savepoint failed because of not all tasks running
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-27257
>                 URL: https://issues.apache.org/jira/browse/FLINK-27257
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>            Reporter: Yang Wang
>            Assignee: Nicholas Jiang
>            Priority: Major
>             Fix For: kubernetes-operator-1.1.0
>
>
> {code:java}
> 2022-04-15 02:38:56,551 o.a.f.k.o.s.FlinkService       [INFO ][default/flink-example-statemachine] Fetching savepoint result with triggerId: 182d7f176496856d7b33fe2f3767da18
> 2022-04-15 02:38:56,690 o.a.f.k.o.s.FlinkService       [ERROR][default/flink-example-statemachine] Savepoint error
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: Custom Source (1/2) of job 00000000000000000000000000000000 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
>     at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143)
>     at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105)
>     at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
>     at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> 2022-04-15 02:38:56,693 o.a.f.k.o.o.SavepointObserver  [ERROR][default/flink-example-statemachine] Checkpoint triggering task Source: Custom Source (1/2) of job 00000000000000000000000000000000 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. {code}
> How to reproduce?
> Update arbitrary fields(e.g. parallelism) along with {{{}savepointTriggerNonce{}}}.
>  
> The root cause might be the running state return by {{ClusterClient#listJobs()}} does not mean all the tasks are running.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)