You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Data Engineer <da...@gmail.com> on 2018/07/11 07:53:41 UTC

Cancelling job with savepoint fails sometimes

I notice that sometimes when I try to cancel a Flink job with savepoint,
the cancel fails with the following error:

org.apache.flink.util.FlinkException: Could not cancel job
3be3d380dca9bb6a5cf0d559d54d7ff8.
        at
org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:581)
        at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:955)
        at
org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:573)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
        at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.util.concurrent.ExecutionException:
java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to
trigger savepoint. Decline reason: Not all required tasks are currently
running.
        at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at
org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:385)
        at
org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:579)
        ... 6 more
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to
trigger savepoint. Decline reason: Not all required tasks are currently
running.
        at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:959)
        at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at
java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:884)
        at
java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2196)
        at
org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:955)
        at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to
trigger savepoint. Decline reason: Not all required tasks are currently
running.
        at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
        at
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
        at
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
        at
org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:947)
        ... 20 more
Caused by: org.apache.flink.runtime.checkpoint.CheckpointTriggerException:
Failed to trigger savepoint. Decline reason: Not all required tasks are
currently running.
        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:377)
        at
org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:946)
        ... 20 more


Also, I see the following lines in the JobManager logs:
2018-07-11 05:41:13,316 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint
triggering task Source: Custom Source -> Flat Map (1/3) of job
e691fa002c682703735afb178ce6ba37 is not being executed at the moment.
Aborting checkpoint.
2018-07-11 05:41:13,517 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint
triggering task Source: Custom Source -> Flat Map (1/3) of job
e691fa002c682703735afb178ce6ba37 is not being executed at the moment.
Aborting checkpoint.
2018-07-11 05:41:13,716 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint
triggering task Source: Custom Source -> Flat Map (1/3) of job
e691fa002c682703735afb178ce6ba37 is not being executed at the moment.
Aborting checkpoint.

Retrying the cancel at this point doesn't help. It keeps failing with the
same error till the job runs to completion.
Note that this issue happens intermittently, not always.

Do I need to do anything in particular in my application source and sink
checkpointing code? Have I forgotten to take care of something? I am using
flink-1.5.0.

I came across a similar issue here, but I don't see any updates:
http://mail-archives.apache.org/mod_mbox/flink-issues/201706.mbox/%3CJIRA.13082229.1498251834000.96074.1498260420026@Atlassian.JIRA%3E

Regards,
James

Re: Cancelling job with savepoint fails sometimes

Posted by Chesnay Schepler <ch...@apache.org>.
My guess is that this is related to 
https://issues.apache.org/jira/browse/FLINK-2491.

The relevant bit is "Failed to trigger savepoint. Decline reason: Not 
all required tasks are currently running."

So, if one task has already finished (for example a source with a small 
finite input) then the savepoint cannot be taken. The same may apply if 
a task is currently restarting, failing etc. .

On 11.07.2018 09:53, Data Engineer wrote:
> I notice that sometimes when I try to cancel a Flink job with 
> savepoint, the cancel fails with the following error:
>
> org.apache.flink.util.FlinkException: Could not cancel job 
> 3be3d380dca9bb6a5cf0d559d54d7ff8.
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:581)
>         at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:955)
>         at 
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:573)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1029)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>         at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>         at 
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> Caused by: java.util.concurrent.ExecutionException: 
> java.util.concurrent.CompletionException: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed 
> to trigger savepoint. Decline reason: Not all required tasks are 
> currently running.
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:385)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:579)
>         ... 6 more
> Caused by: java.util.concurrent.CompletionException: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed 
> to trigger savepoint. Decline reason: Not all required tasks are 
> currently running.
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:959)
>         at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>         at 
> java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:884)
>         at 
> java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2196)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:955)
>         at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>         at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>         at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>         at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>         at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>         at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed 
> to trigger savepoint. Decline reason: Not all required tasks are 
> currently running.
>         at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>         at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>         at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>         at 
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
>         at 
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:947)
>         ... 20 more
> Caused by: 
> org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed 
> to trigger savepoint. Decline reason: Not all required tasks are 
> currently running.
>         at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:377)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:946)
>         ... 20 more
>
>
> Also, I see the following lines in the JobManager logs:
> 2018-07-11 05:41:13,316 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - 
> Checkpoint triggering task Source: Custom Source -> Flat Map (1/3) of 
> job e691fa002c682703735afb178ce6ba37 is not being executed at the 
> moment. Aborting checkpoint.
> 2018-07-11 05:41:13,517 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - 
> Checkpoint triggering task Source: Custom Source -> Flat Map (1/3) of 
> job e691fa002c682703735afb178ce6ba37 is not being executed at the 
> moment. Aborting checkpoint.
> 2018-07-11 05:41:13,716 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - 
> Checkpoint triggering task Source: Custom Source -> Flat Map (1/3) of 
> job e691fa002c682703735afb178ce6ba37 is not being executed at the 
> moment. Aborting checkpoint.
>
> Retrying the cancel at this point doesn't help. It keeps failing with 
> the same error till the job runs to completion.
> Note that this issue happens intermittently, not always.
>
> Do I need to do anything in particular in my application source and 
> sink checkpointing code? Have I forgotten to take care of something? I 
> am using flink-1.5.0.
>
> I came across a similar issue here, but I don't see any updates:
> http://mail-archives.apache.org/mod_mbox/flink-issues/201706.mbox/%3CJIRA.13082229.1498251834000.96074.1498260420026@Atlassian.JIRA%3E
>
> Regards,
> James