You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Anton Kalashnikov (Jira)" <ji...@apache.org> on 2021/04/20 15:37:00 UTC
[jira] [Created] (FLINK-22379) Introduce a new JobStatus to avoid
premature checkpoint triggering
Anton Kalashnikov created FLINK-22379:
-----------------------------------------
Summary: Introduce a new JobStatus to avoid premature checkpoint triggering
Key: FLINK-22379
URL: https://issues.apache.org/jira/browse/FLINK-22379
Project: Flink
Issue Type: Improvement
Components: Runtime / Checkpointing
Reporter: Anton Kalashnikov
Right now, when JobStatus switches to RUNNING it allows CheckpointCoordinator to trigger checkpoint which is ok. But unfortunately, JobStatus switches to RUNNING before TaskState(ExecutionState) switches even to SCHEDULED. And this leads to several problems, one of them you can see in the log:
{noformat}
WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to trigger checkpoint for job bc943302f92d979824fbc8f4cabc5db3.)org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: EventSource -> Timestamps/Watermarks (1/7) of job bc943302f92d979824fbc8f4cabc5db3 has not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_272] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]{noformat}
To avoid this problem, it is a good idea to introduce new JobStatus between CREATED and RUNNING(RESTORING?). And then:
* JobStatus CREATED switches to RESTORING at the same time when right now CREATED switches to RUNNING
* JobStatus RESTORING switches to RUNNING when all tasks switched their states from INITIALIZING to RUNNING
It also makes sense to rename ExecutionState.INITIALIZING to RESTORING in order to have the same name for job and task.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)