You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Arvid Heise (Jira)" <ji...@apache.org> on 2021/01/11 15:04:00 UTC

[jira] [Comment Edited] (FLINK-20376) Error in restoring checkpoint/savepoint when Flink is upgraded from 1.9 to 1.11.2

    [ https://issues.apache.org/jira/browse/FLINK-20376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17262702#comment-17262702 ] 

Arvid Heise edited comment on FLINK-20376 at 1/11/21, 3:03 PM:
---------------------------------------------------------------

I think you misunderstood my statement.
{noformat}
Did you give manual ids to all operators (only then we guarantee that things can be restored)?
{noformat}

{noformat}
Arvid Heise No. No manual ids have not been give to any operators. Only in one section, we have given but the job name is same for both 1.9 and 1.10.2 version so, I think it should not matter. All are DataStream APIs, we are not using any Table APIs.
{noformat}

So let me quote the [https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids|http://example.com]

{noformat}
It is highly recommended that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the uid(String) method. These IDs are used to scope the state of each operator.
{noformat}

I'm suspecting that the automatically generated IDs are incompatible between versions and they are not guaranteed to be stable.

So I propose to you to retry the migration with setting the uid before and after upgrade.


was (Author: aheise):
I think you misunderstood my statement.
{noformat}
Did you give manual ids to all operators (only then we guarantee that things can be restored)?
{noformat}

{noformat}
Arvid Heise No. No manual ids have not been give to any operators. Only in one section, we have given but the job name is same for both 1.9 and 1.10.2 version so, I think it should not matter. All are DataStream APIs, we are not using any Table APIs.
{noformat}

So let me quote the [https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids|http://example.com]

{noformat}
It is highly recommended that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the uid(String) method. These IDs are used to scope the state of each operator.
{noformat}

I'm suspecting that the automatically generated IDs are incompatible between versions and they are not guaranteed to be stable.



> Error in restoring checkpoint/savepoint when Flink is upgraded from 1.9 to 1.11.2
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-20376
>                 URL: https://issues.apache.org/jira/browse/FLINK-20376
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Partha Pradeep Mishra
>            Priority: Major
>         Attachments: image-2020-12-10-15-04-39-624.png, image-2020-12-10-15-06-48-013.png, image-2020-12-10-15-09-13-527.png
>
>
> We tried to save checkpoints for one of the flink job (1.9 version) and then import/restore the checkpoints in the newer flink version (1.11.2). The import/resume operation failed with the below error. Please note that both the jobs(i.e. one running in 1.9 and other in 1.11.2) are same binary with no code difference or introduction of new operators. Still we got the below issue.
> _Cannot map checkpoint/savepoint state for operator fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator is not available in the new program._
> *Complete Stack Trace :*
> {"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application.\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t... 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application.\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t... 7 more\nCaused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t... 10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)\n\tat org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)\n\tat org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n\tat org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)\n\tat org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)\n\tat com.man.ceon.cep.jobs.AnalyticService$.main(AnalyticService.scala:108)\n\tat com.man.ceon.cep.jobs.AnalyticService.main(AnalyticService.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)\n\t... 13 more\nCaused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)\n\tat akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate JobManager.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t... 6 more\nCaused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint s3://prodv2-flink-cluster/savepoints/savepoint-b76d18-d302cc7ca666. Cannot map checkpoint/savepoint state for operator fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.\n\tat org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:210)\n\tat org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:180)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1397)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)\n\tat org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)\n\tat org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)\n\tat org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)\n\t... 7 more\n"]}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)