You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Andrey Zagrebin (JIRA)" <ji...@apache.org> on 2019/04/09 12:20:00 UTC

[jira] [Commented] (FLINK-8902) Re-scaling job sporadically fails with KeeperException

    [ https://issues.apache.org/jira/browse/FLINK-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813325#comment-16813325 ] 

Andrey Zagrebin commented on FLINK-8902:
----------------------------------------

I managed to reproduce it quit quickly locally with standalone setup and embedded bin/zookeeper.sh to enable HA.
The reason seems to be the following.

JobMaster.rescaleOperators takes a savepoint in getJobModificationSavepoint which eventually calls ZooKeeperCompletedCheckpointStore.addCheckpoint.
This creates a node and lock in ZooKeeperStateHandleStore for savepoint.

Then JobMaster.rescaleOperators restores the new execution graph in restoreExecutionGraphFromRescalingSavepoint. This calls eventually again ZooKeeperCompletedCheckpointStore.addCheckpoint for savepoint. The problem is that although It happens in another ZooKeeperCompletedCheckpointStore instance, it uses the same underlying zookeeper storage where the same node already exists with the same checkpoint id.

In case of non-HA scenario, we need to import the savepoint to the new checkpoint storage of rescaled execution graph but for HA mode, it turns out to be the same underlying zookeeper storage.

> Re-scaling job sporadically fails with KeeperException
> ------------------------------------------------------
>
>                 Key: FLINK-8902
>                 URL: https://issues.apache.org/jira/browse/FLINK-8902
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.5.0, 1.6.0
>         Environment: Commit: 80020cb
> Hadoop: 2.8.3
> YARN
>  
>            Reporter: Gary Yao
>            Assignee: Andrey Zagrebin
>            Priority: Critical
>              Labels: flip6
>             Fix For: 1.7.3, 1.6.5
>
>
> *Description*
>  Re-scaling a job with {{bin/flink modify -p <new_parallelism>}} sporadically fails with a {{KeeperException}}
> *Steps to reproduce*
>  # Submit job to Flink cluster with flip6 enabled running on YARN (session mode).
>  # Re-scale job (5-20 times)
> *Stacktrace (client)*
> {noformat}
> org.apache.flink.util.FlinkException: Could not rescale job 61e2e99db2e959ebd94e40f9c5e816bc.
> 	at org.apache.flink.client.cli.CliFrontend.lambda$modify$8(CliFrontend.java:766)
> 	at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:954)
> 	at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:757)
> 	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1037)
> 	at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1095)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> 	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1095)
> Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could not restore from temporary rescaling savepoint. This might indicate that the savepoint hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got corrupted. Deleting this savepoint as a precaution.
> 	at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$3(JobMaster.java:525)
> 	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> 	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> 	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:295)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:150)
> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> 	at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> 	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could not restore from temporary rescaling savepoint. This might indicate that the savepoint hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got corrupted. Deleting this savepoint as a precaution.
> 	at org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$17(JobMaster.java:1317)
> 	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> 	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> 	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException: ZooKeeper unexpectedly modified
> 	at org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:168)
> 	at org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.addCheckpoint(ZooKeeperCompletedCheckpointStore.java:233)
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1088)
> 	at org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1161)
> 	at org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$17(JobMaster.java:1297)
> 	... 10 more
> Caused by: org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
> 	at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
> 	at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1006)
> 	at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:910)
> 	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:159)
> 	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.access$200(CuratorTransactionImpl.java:44)
> 	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:129)
> 	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:125)
> 	at org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109)
> 	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.commit(CuratorTransactionImpl.java:122)
> 	at org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:162)
> 	... 14 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)