You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lei Chen <le...@gmail.com> on 2017/10/16 04:28:51 UTC

problem scale Flink job on YARN

Hi,

We're trying to implement some module to help autoscale our pipeline which
is built  with Flink on YARN. According to the document, the suggested
procedure seems to be:

1. cancel job with savepoint
2. start new job with increased YARN TM number and parallelism.

However, step 2 always gave error

Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint
hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot map
savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the new
program, because the operator is not available in the new program. If you
want to allow to skip this, you can set the --allowNonRestoredState option
on the CLI.
at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.
loadAndValidateSavepoint(SavepointLoader.java:130)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.
restoreSavepoint(CheckpointCoordinator.java:1140)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$
runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(
JobManager.scala:1386)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$
runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$
runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.
liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)

The procedure worked fine if parallelism was not changed.

Also want to mention that I didn't manually specify OperatorID in my job. The
document does mentioned manually OperatorID assignment is suggested, just
curious is that mandatory in my case to fix the problem I'm seeing, given
that my program doesn't change at all so the autogenerated operatorID
should be unchanged after parallelism increase?

thanks,
Lei

Re: problem scale Flink job on YARN

Posted by Lei Chen <le...@gmail.com>.
Hi Aljoscha,

I'm using version 1.3.0 and changing job-wide parallelism.

Lei

On Thu, Oct 19, 2017 at 9:47 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Lei,
>
> Which version of Flink would that be? I'm guessing >= 1.3.x? In Flink 1.1
> the hash of an operator was tied to the parallelism but starting with 1.2
> that shouldn't happen anymore.
>
> Also, are you changing the parallelism job-wide or are there operators
> with differing parallelism? For example, could there be a source with
> parallelism 1 and an operator that had parallelism 1 after that which now
> has a different parallelism?
>
> Best,
> Aljoscha
>
>
> On 16. Oct 2017, at 06:28, Lei Chen <le...@gmail.com> wrote:
>
> Hi,
>
> We're trying to implement some module to help autoscale our pipeline which
> is built  with Flink on YARN. According to the document, the suggested
> procedure seems to be:
>
> 1. cancel job with savepoint
> 2. start new job with increased YARN TM number and parallelism.
>
> However, step 2 always gave error
>
> Caused by: java.lang.IllegalStateException: Failed to rollback to
> savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot
> map savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the
> new program, because the operator is not available in the new program. If
> you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
> at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoade
> r.loadAndValidateSavepoint(SavepointLoader.java:130)
> at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.re
> storeSavepoint(CheckpointCoordinator.java:1140)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobMa
> nager.scala:1386)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.s
> cala:1372)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.s
> cala:1372)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
> dTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
> uture.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
> exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
> l.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
> orkerThread.java:107)
>
> The procedure worked fine if parallelism was not changed.
>
> Also want to mention that I didn't manually specify OperatorID in my job. The
> document does mentioned manually OperatorID assignment is suggested, just
> curious is that mandatory in my case to fix the problem I'm seeing, given
> that my program doesn't change at all so the autogenerated operatorID
> should be unchanged after parallelism increase?
>
> thanks,
> Lei
>
>
>

Re: problem scale Flink job on YARN

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Lei,

Which version of Flink would that be? I'm guessing >= 1.3.x? In Flink 1.1 the hash of an operator was tied to the parallelism but starting with 1.2 that shouldn't happen anymore.

Also, are you changing the parallelism job-wide or are there operators with differing parallelism? For example, could there be a source with parallelism 1 and an operator that had parallelism 1 after that which now has a different parallelism?

Best,
Aljoscha

> On 16. Oct 2017, at 06:28, Lei Chen <le...@gmail.com> wrote:
> 
> Hi, 
> 
> We're trying to implement some module to help autoscale our pipeline which is built  with Flink on YARN. According to the document, the suggested procedure seems to be:
> 
> 1. cancel job with savepoint
> 2. start new job with increased YARN TM number and parallelism. 
> 
> However, step 2 always gave error 
> 
> Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot map savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
> 	at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:130)
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> The procedure worked fine if parallelism was not changed. 
> 
> Also want to mention that I didn't manually specify OperatorID in my job. The document does mentioned manually OperatorID assignment is suggested, just curious is that mandatory in my case to fix the problem I'm seeing, given that my program doesn't change at all so the autogenerated operatorID should be unchanged after parallelism increase?
> 
> thanks,
> Lei