You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Anirudh Mallem <an...@247-inc.com> on 2016/10/18 21:07:46 UTC

Issue while restarting from SavePoint

Hi,
I am relatively new to Flink and I was experimenting with the save points feature. I have an HA cluster running with 1 Master and 4 Workers. The flink-config.yaml is as follows :

#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: stable-stream-master01.app.shared.int.sv2.247-inc.net

# The port where the JobManager's main actor system listens for messages.
jobmanager.rpc.port: 6123

# The heap size for the JobManager JVM
jobmanager.heap.mb: 512

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 2048

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 8

# Specify whether TaskManager memory should be allocated when starting up (true) or when
# memory is required in the memory manager (false)
taskmanager.memory.preallocate: false

# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1

env.java.home: /usr/local/java

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 10 s
#==============================================================================
# Web Frontend
#==============================================================================

# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.

jobmanager.web.port: 8081

# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.

#jobmanager.web.submit.enable: false

#==============================================================================
# Streaming state checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends: jobmanager, filesystem, <class-name-of-factory>
#
state.backend: filesystem


# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
#
 state.backend.fs.checkpointdir: file:///home/amallem/
 state.savepoints.dir: file:///home/amallem/save/

#==============================================================================
# Master High Availability (required configuration)
#==============================================================================

# The list of ZooKepper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
#
 recovery.mode: zookeeper
#
 recovery.zookeeper.quorum: stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
 recovery.zookeeper.storageDir: file:///home/amallem/recovery
 recovery.zookeeper.path.root: /flink
 recovery.zookeeper.path.namespace: /cluster_one

Query : I am able to take a save point and then cancel my current job but when I try to start the job again using the save point I get the error stating “ProgramInvocationException: JobManager did not respond within 60000ms”. I checked Zookeeper and everything seems fine. I also followed the recommendations in the following post : http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli
 Is there any configuration I am missing to enable restarting of the job. Any help will be appreciated. Thanks.

Regards,
Anirudh

Re: Issue while restarting from SavePoint

Posted by Anirudh Mallem <an...@247-inc.com>.
Hi Ufuk,
Thank you for looking into the issue. Please find your answers below :

(1) In detached mode the configuration seems to be not picked up
correctly. That should be independent of the savepoints. Can you
confirm this?
—> I tried starting a new job in detached mode and the job started on the cluster.


(2) The program was changed in a non-compatible way after the
savepoint. Did you change the program and if yes in which way?
—> No, I did not make any change to the existing job. I tried restarting the same job. 


However, I think I have found the problem. I was not mentioning the parallelism specifically when restarting the job from the savepoint. I assumed that this information was also captured in the save point. So the non-detached mode was actually throwing the right error but the detached mode was not picking up the config. I guess the detached mode should also have thrown the same exception right? Thanks a lot for helping.



On 10/19/16, 1:19 AM, "Ufuk Celebi" <uc...@apache.org> wrote:

>Hey Anirudh!
>
>As you say, this looks like two issues:
>
>(1) In detached mode the configuration seems to be not picked up
>correctly. That should be independent of the savepoints. Can you
>confirm this?
>
>(2) The program was changed in a non-compatible way after the
>savepoint. Did you change the program and if yes in which way?
>
>– Ufuk
>
>
>On Wed, Oct 19, 2016 at 12:01 AM, Anirudh Mallem
><an...@247-inc.com> wrote:
>> Hi,
>> The issue seems to be connected with trying to restart the job in the
>> detached mode. The stack trace is as follows:
>>
>> -bash-3.2$ bin/flink run -d -s jobmanager://savepoints/1 -c
>> com.tfs.rtdp.precompute.Flink.FlinkTest
>> /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar
>> file:///home/amallem/capitalone.properties
>> Cluster configuration: Standalone cluster with JobManager at
>> /10.64.119.90:33167
>> Using address 10.64.119.90:33167 to connect to JobManager.
>> JobManager web interface address http://10.64.119.90:8081
>> Starting execution of program
>> Submitting Job with JobID: 6c5596772627d3c9366deaa0c47ab0ad. Returning after
>> job submission.
>>
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: JobManager did not respond within 60000 milliseconds
>> at
>> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:432)
>> at
>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:93)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
>> at
>> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:323)
>> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager
>> did not respond within 60000 milliseconds
>> at
>> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:227)
>> at
>> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:429)
>> ... 8 more
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [60000 milliseconds]
>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> at
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>> at scala.concurrent.Await$.result(package.scala:107)
>> at scala.concurrent.Await.result(package.scala)
>> at
>> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:223)
>> ... 9 more
>>
>> When running it without the detached mode, the job manager is responding but
>> the job is failing as it thinks that the structure of the job is modified.
>>
>> -bash-3.2$ bin/flink run -s jobmanager://savepoints/1 -c
>> com.tfs.rtdp.precompute.Flink.FlinkTest
>> /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar
>> file:///home/amallem/capitalone.properties
>> Cluster configuration: Standalone cluster with JobManager at
>> /10.64.119.90:33167
>> Using address 10.64.119.90:33167 to connect to JobManager.
>> JobManager web interface address http://10.64.119.90:8081
>> Starting execution of program
>> Submitting job with JobID: fe7bf69676a5127eecb392e3a0743c6d. Waiting for job
>> completion.
>> Connected to JobManager at
>> Actor[akka.tcp://flink@10.64.119.90:33167/user/jobmanager#-2047134036]
>> 10/18/2016 14:56:11 Job execution switched to status FAILING.
>> org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable
>> failure. This suppresses job restarts. Please check the stack trace for the
>> root cause.
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint
>> jobmanager://savepoints/1. Cannot map old state for task
>> 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the
>> program has been changed in a non-compatible way  after the savepoint.
>> at
>> org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302)
>> ... 10 more
>> 10/18/2016 14:56:11 Source: Custom Source(1/1) switched to CANCELED
>> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED
>> 10/18/2016 14:56:11 Filter(1/1) switched to CANCELED
>> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED
>> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED
>> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED
>> 10/18/2016 14:56:11 Sink: Unnamed(1/1) switched to CANCELED
>> 10/18/2016 14:56:11 Job execution switched to status FAILED.
>>
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
>> at
>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
>> at
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585)
>> at com.tfs.rtdp.precompute.Flink.FlinkTest$.main(FlinkTest.scala:141)
>> at com.tfs.rtdp.precompute.Flink.FlinkTest.main(FlinkTest.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
>> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: org.apache.flink.runtime.execution.SuppressRestartsException:
>> Unrecoverable failure. This suppresses job restarts. Please check the stack
>> trace for the root cause.
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> ... 2 more
>> Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint
>> jobmanager://savepoints/1. Cannot map old state for task
>> 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the
>> program has been changed in a non-compatible way  after the savepoint.
>> at
>> org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302)
>> ... 10 more
>>
>> Thanks.
>>
>> From: Anirudh Mallem
>> Date: Tuesday, October 18, 2016 at 2:07 PM
>> To: "user@flink.apache.org"
>> Subject: Issue while restarting from SavePoint
>>
>> Hi,
>> I am relatively new to Flink and I was experimenting with the save points
>> feature. I have an HA cluster running with 1 Master and 4 Workers. The
>> flink-config.yaml is as follows :
>>
>> #==============================================================================
>> # Common
>> #==============================================================================
>> jobmanager.rpc.address:
>> stable-stream-master01.app.shared.int.sv2.247-inc.net
>>
>> # The port where the JobManager's main actor system listens for messages.
>> jobmanager.rpc.port: 6123
>>
>> # The heap size for the JobManager JVM
>> jobmanager.heap.mb: 512
>>
>> # The heap size for the TaskManager JVM
>> taskmanager.heap.mb: 2048
>>
>> # The number of task slots that each TaskManager offers. Each slot runs one
>> parallel pipeline.
>> taskmanager.numberOfTaskSlots: 8
>>
>> # Specify whether TaskManager memory should be allocated when starting up
>> (true) or when
>> # memory is required in the memory manager (false)
>> taskmanager.memory.preallocate: false
>>
>> # The parallelism used for programs that did not specify and other
>> parallelism.
>> parallelism.default: 1
>>
>> env.java.home: /usr/local/java
>>
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 2
>> restart-strategy.fixed-delay.delay: 10 s
>> #==============================================================================
>> # Web Frontend
>> #==============================================================================
>>
>> # The port under which the web-based runtime monitor listens.
>> # A value of -1 deactivates the web server.
>>
>> jobmanager.web.port: 8081
>>
>> # Flag to specify whether job submission is enabled from the web-based
>> # runtime monitor. Uncomment to disable.
>>
>> #jobmanager.web.submit.enable: false
>>
>> #==============================================================================
>> # Streaming state checkpointing
>> #==============================================================================
>>
>> # The backend that will be used to store operator state checkpoints if
>> # checkpointing is enabled.
>> #
>> # Supported backends: jobmanager, filesystem, <class-name-of-factory>
>> #
>> state.backend: filesystem
>>
>>
>> # Directory for storing checkpoints in a Flink-supported filesystem
>> # Note: State backend must be accessible from the JobManager and all
>> TaskManagers.
>> # Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file
>> systems,
>> # (or any local file system under Windows), or "S3://" for S3 file system.
>> #
>>  state.backend.fs.checkpointdir: file:///home/amallem/
>>  state.savepoints.dir: file:///home/amallem/save/
>>
>> #==============================================================================
>> # Master High Availability (required configuration)
>> #==============================================================================
>>
>> # The list of ZooKepper quorum peers that coordinate the high-availability
>> # setup. This must be a list of the form:
>> # "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
>> #
>>  recovery.mode: zookeeper
>> #
>>  recovery.zookeeper.quorum:
>> stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181
>> #
>> # Note: You need to set the state backend to 'filesystem' and the checkpoint
>> # directory (see above) before configuring the storageDir.
>> #
>>  recovery.zookeeper.storageDir: file:///home/amallem/recovery
>>  recovery.zookeeper.path.root: /flink
>>  recovery.zookeeper.path.namespace: /cluster_one
>>
>> Query : I am able to take a save point and then cancel my current job but
>> when I try to start the job again using the save point I get the error
>> stating “ProgramInvocationException: JobManager did not respond within
>> 60000ms”. I checked Zookeeper and everything seems fine. I also followed the
>> recommendations in the following post :
>> http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli
>>  Is there any configuration I am missing to enable restarting of the job.
>> Any help will be appreciated. Thanks.
>>
>> Regards,
>> Anirudh

Re: Issue while restarting from SavePoint

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Anirudh!

As you say, this looks like two issues:

(1) In detached mode the configuration seems to be not picked up
correctly. That should be independent of the savepoints. Can you
confirm this?

(2) The program was changed in a non-compatible way after the
savepoint. Did you change the program and if yes in which way?

– Ufuk


On Wed, Oct 19, 2016 at 12:01 AM, Anirudh Mallem
<an...@247-inc.com> wrote:
> Hi,
> The issue seems to be connected with trying to restart the job in the
> detached mode. The stack trace is as follows:
>
> -bash-3.2$ bin/flink run -d -s jobmanager://savepoints/1 -c
> com.tfs.rtdp.precompute.Flink.FlinkTest
> /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar
> file:///home/amallem/capitalone.properties
> Cluster configuration: Standalone cluster with JobManager at
> /10.64.119.90:33167
> Using address 10.64.119.90:33167 to connect to JobManager.
> JobManager web interface address http://10.64.119.90:8081
> Starting execution of program
> Submitting Job with JobID: 6c5596772627d3c9366deaa0c47ab0ad. Returning after
> job submission.
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: JobManager did not respond within 60000 milliseconds
> at
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:432)
> at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:93)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
> at
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:323)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager
> did not respond within 60000 milliseconds
> at
> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:227)
> at
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:429)
> ... 8 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [60000 milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at scala.concurrent.Await.result(package.scala)
> at
> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:223)
> ... 9 more
>
> When running it without the detached mode, the job manager is responding but
> the job is failing as it thinks that the structure of the job is modified.
>
> -bash-3.2$ bin/flink run -s jobmanager://savepoints/1 -c
> com.tfs.rtdp.precompute.Flink.FlinkTest
> /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar
> file:///home/amallem/capitalone.properties
> Cluster configuration: Standalone cluster with JobManager at
> /10.64.119.90:33167
> Using address 10.64.119.90:33167 to connect to JobManager.
> JobManager web interface address http://10.64.119.90:8081
> Starting execution of program
> Submitting job with JobID: fe7bf69676a5127eecb392e3a0743c6d. Waiting for job
> completion.
> Connected to JobManager at
> Actor[akka.tcp://flink@10.64.119.90:33167/user/jobmanager#-2047134036]
> 10/18/2016 14:56:11 Job execution switched to status FAILING.
> org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable
> failure. This suppresses job restarts. Please check the stack trace for the
> root cause.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint
> jobmanager://savepoints/1. Cannot map old state for task
> 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the
> program has been changed in a non-compatible way  after the savepoint.
> at
> org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302)
> ... 10 more
> 10/18/2016 14:56:11 Source: Custom Source(1/1) switched to CANCELED
> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED
> 10/18/2016 14:56:11 Filter(1/1) switched to CANCELED
> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED
> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED
> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED
> 10/18/2016 14:56:11 Sink: Unnamed(1/1) switched to CANCELED
> 10/18/2016 14:56:11 Job execution switched to status FAILED.
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
> at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585)
> at com.tfs.rtdp.precompute.Flink.FlinkTest$.main(FlinkTest.scala:141)
> at com.tfs.rtdp.precompute.Flink.FlinkTest.main(FlinkTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.execution.SuppressRestartsException:
> Unrecoverable failure. This suppresses job restarts. Please check the stack
> trace for the root cause.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ... 2 more
> Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint
> jobmanager://savepoints/1. Cannot map old state for task
> 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the
> program has been changed in a non-compatible way  after the savepoint.
> at
> org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302)
> ... 10 more
>
> Thanks.
>
> From: Anirudh Mallem
> Date: Tuesday, October 18, 2016 at 2:07 PM
> To: "user@flink.apache.org"
> Subject: Issue while restarting from SavePoint
>
> Hi,
> I am relatively new to Flink and I was experimenting with the save points
> feature. I have an HA cluster running with 1 Master and 4 Workers. The
> flink-config.yaml is as follows :
>
> #==============================================================================
> # Common
> #==============================================================================
> jobmanager.rpc.address:
> stable-stream-master01.app.shared.int.sv2.247-inc.net
>
> # The port where the JobManager's main actor system listens for messages.
> jobmanager.rpc.port: 6123
>
> # The heap size for the JobManager JVM
> jobmanager.heap.mb: 512
>
> # The heap size for the TaskManager JVM
> taskmanager.heap.mb: 2048
>
> # The number of task slots that each TaskManager offers. Each slot runs one
> parallel pipeline.
> taskmanager.numberOfTaskSlots: 8
>
> # Specify whether TaskManager memory should be allocated when starting up
> (true) or when
> # memory is required in the memory manager (false)
> taskmanager.memory.preallocate: false
>
> # The parallelism used for programs that did not specify and other
> parallelism.
> parallelism.default: 1
>
> env.java.home: /usr/local/java
>
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 2
> restart-strategy.fixed-delay.delay: 10 s
> #==============================================================================
> # Web Frontend
> #==============================================================================
>
> # The port under which the web-based runtime monitor listens.
> # A value of -1 deactivates the web server.
>
> jobmanager.web.port: 8081
>
> # Flag to specify whether job submission is enabled from the web-based
> # runtime monitor. Uncomment to disable.
>
> #jobmanager.web.submit.enable: false
>
> #==============================================================================
> # Streaming state checkpointing
> #==============================================================================
>
> # The backend that will be used to store operator state checkpoints if
> # checkpointing is enabled.
> #
> # Supported backends: jobmanager, filesystem, <class-name-of-factory>
> #
> state.backend: filesystem
>
>
> # Directory for storing checkpoints in a Flink-supported filesystem
> # Note: State backend must be accessible from the JobManager and all
> TaskManagers.
> # Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file
> systems,
> # (or any local file system under Windows), or "S3://" for S3 file system.
> #
>  state.backend.fs.checkpointdir: file:///home/amallem/
>  state.savepoints.dir: file:///home/amallem/save/
>
> #==============================================================================
> # Master High Availability (required configuration)
> #==============================================================================
>
> # The list of ZooKepper quorum peers that coordinate the high-availability
> # setup. This must be a list of the form:
> # "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
> #
>  recovery.mode: zookeeper
> #
>  recovery.zookeeper.quorum:
> stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181
> #
> # Note: You need to set the state backend to 'filesystem' and the checkpoint
> # directory (see above) before configuring the storageDir.
> #
>  recovery.zookeeper.storageDir: file:///home/amallem/recovery
>  recovery.zookeeper.path.root: /flink
>  recovery.zookeeper.path.namespace: /cluster_one
>
> Query : I am able to take a save point and then cancel my current job but
> when I try to start the job again using the save point I get the error
> stating “ProgramInvocationException: JobManager did not respond within
> 60000ms”. I checked Zookeeper and everything seems fine. I also followed the
> recommendations in the following post :
> http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli
>  Is there any configuration I am missing to enable restarting of the job.
> Any help will be appreciated. Thanks.
>
> Regards,
> Anirudh

Re: Issue while restarting from SavePoint

Posted by Anirudh Mallem <an...@247-inc.com>.
Hi,
The issue seems to be connected with trying to restart the job in the detached mode. The stack trace is as follows:

-bash-3.2$ bin/flink run -d -s jobmanager://savepoints/1 -c com.tfs.rtdp.precompute.Flink.FlinkTest /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar file:///home/amallem/capitalone.properties
Cluster configuration: Standalone cluster with JobManager at /10.64.119.90:33167
Using address 10.64.119.90:33167 to connect to JobManager.
JobManager web interface address http://10.64.119.90:8081
Starting execution of program
Submitting Job with JobID: 6c5596772627d3c9366deaa0c47ab0ad. Returning after job submission.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:432)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:93)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:323)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:227)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:429)
... 8 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:223)
... 9 more

When running it without the detached mode, the job manager is responding but the job is failing as it thinks that the structure of the job is modified.

-bash-3.2$ bin/flink run -s jobmanager://savepoints/1 -c com.tfs.rtdp.precompute.Flink.FlinkTest /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar file:///home/amallem/capitalone.properties
Cluster configuration: Standalone cluster with JobManager at /10.64.119.90:33167
Using address 10.64.119.90:33167 to connect to JobManager.
JobManager web interface address http://10.64.119.90:8081
Starting execution of program
Submitting job with JobID: fe7bf69676a5127eecb392e3a0743c6d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@10.64.119.90:33167/user/jobmanager#-2047134036]
10/18/2016 14:56:11 Job execution switched to status FAILING.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint jobmanager://savepoints/1. Cannot map old state for task 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the program has been changed in a non-compatible way  after the savepoint.
at org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302)
... 10 more
10/18/2016 14:56:11 Source: Custom Source(1/1) switched to CANCELED
10/18/2016 14:56:11 Map(1/1) switched to CANCELED
10/18/2016 14:56:11 Filter(1/1) switched to CANCELED
10/18/2016 14:56:11 Map(1/1) switched to CANCELED
10/18/2016 14:56:11 Map(1/1) switched to CANCELED
10/18/2016 14:56:11 Map(1/1) switched to CANCELED
10/18/2016 14:56:11 Sink: Unnamed(1/1) switched to CANCELED
10/18/2016 14:56:11 Job execution switched to status FAILED.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585)
at com.tfs.rtdp.precompute.Flink.FlinkTest$.main(FlinkTest.scala:141)
at com.tfs.rtdp.precompute.Flink.FlinkTest.main(FlinkTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
... 2 more
Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint jobmanager://savepoints/1. Cannot map old state for task 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the program has been changed in a non-compatible way  after the savepoint.
at org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302)
... 10 more

Thanks.

From: Anirudh Mallem
Date: Tuesday, October 18, 2016 at 2:07 PM
To: "user@flink.apache.org<ma...@flink.apache.org>"
Subject: Issue while restarting from SavePoint

Hi,
I am relatively new to Flink and I was experimenting with the save points feature. I have an HA cluster running with 1 Master and 4 Workers. The flink-config.yaml is as follows :

#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: stable-stream-master01.app.shared.int.sv2.247-inc.net

# The port where the JobManager's main actor system listens for messages.
jobmanager.rpc.port: 6123

# The heap size for the JobManager JVM
jobmanager.heap.mb: 512

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 2048

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 8

# Specify whether TaskManager memory should be allocated when starting up (true) or when
# memory is required in the memory manager (false)
taskmanager.memory.preallocate: false

# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1

env.java.home: /usr/local/java

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 10 s
#==============================================================================
# Web Frontend
#==============================================================================

# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.

jobmanager.web.port: 8081

# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.

#jobmanager.web.submit.enable: false

#==============================================================================
# Streaming state checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends: jobmanager, filesystem, <class-name-of-factory>
#
state.backend: filesystem


# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
#
 state.backend.fs.checkpointdir: file:///home/amallem/
 state.savepoints.dir: file:///home/amallem/save/

#==============================================================================
# Master High Availability (required configuration)
#==============================================================================

# The list of ZooKepper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
#
 recovery.mode: zookeeper
#
 recovery.zookeeper.quorum: stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
 recovery.zookeeper.storageDir: file:///home/amallem/recovery
 recovery.zookeeper.path.root: /flink
 recovery.zookeeper.path.namespace: /cluster_one

Query : I am able to take a save point and then cancel my current job but when I try to start the job again using the save point I get the error stating “ProgramInvocationException: JobManager did not respond within 60000ms”. I checked Zookeeper and everything seems fine. I also followed the recommendations in the following post : http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli
 Is there any configuration I am missing to enable restarting of the job. Any help will be appreciated. Thanks.

Regards,
Anirudh