You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lei Chen <le...@gmail.com> on 2017/10/13 18:22:26 UTC

problem scale up Flink on YARN

Hi,

We're trying to implement some module to help autoscale our pipeline which
is built  with Flink on YARN. According to the document, the suggested
procedure seems to be:

1. cancel job with savepoint
2. start new job with increased YARN TM number and parallelism.

However, step 2 always gave error

Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint
hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot map
savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the new
program, because the operator is not available in the new program. If you
want to allow to skip this, you can set the --allowNonRestoredState option
on the CLI.
at
org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:130)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Also want to mention that the procedure worked fine if parallelism was not
changed.

The document does mentioned about manually OperatorID assignment, just
curious is that mandatory for my case to fix the problem I'm seeing, given
that my program doesn't change at all so the autogenerated operatorID
should be unchanged after parallelism increase?

thanks,
Lei