You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Anton Kalashnikov (Jira)" <ji...@apache.org> on 2021/04/20 15:37:00 UTC

[jira] [Created] (FLINK-22379) Introduce a new JobStatus to avoid premature checkpoint triggering

Anton Kalashnikov created FLINK-22379:
-----------------------------------------

             Summary: Introduce a new JobStatus to avoid premature checkpoint triggering
                 Key: FLINK-22379
                 URL: https://issues.apache.org/jira/browse/FLINK-22379
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Checkpointing
            Reporter: Anton Kalashnikov


Right now, when JobStatus switches to RUNNING it allows CheckpointCoordinator to trigger checkpoint which is ok. But unfortunately, JobStatus switches to RUNNING before TaskState(ExecutionState) switches even to SCHEDULED. And this leads to several problems, one of them you can see in the log:
{noformat}
WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to trigger checkpoint for job bc943302f92d979824fbc8f4cabc5db3.)org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: EventSource -> Timestamps/Watermarks (1/7) of job bc943302f92d979824fbc8f4cabc5db3 has not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.    at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_272]    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]{noformat}
To avoid this problem, it is a good idea to introduce new JobStatus between CREATED and RUNNING(RESTORING?). And then:
 * JobStatus CREATED switches to RESTORING at the same time when right now CREATED switches to RUNNING
 * JobStatus RESTORING switches to RUNNING when all tasks switched their states from INITIALIZING to RUNNING


It also makes sense to rename ExecutionState.INITIALIZING to RESTORING in order to have the same name for job and task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)