You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Adam Roberts <ad...@gmail.com> on 2021/12/14 16:16:35 UTC

Unable to restore from checkpoint (Flink 1.13.2 and 1.13.0): serial version has changed in InflightDataRescalingDescriptor

Hi everyone,

I'm aware of the compatibility matrix for Flink which is available here
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/ -
which suggests 1.13.x and 1.13.x should work just fine.

However, if we try to restore a 1.13.0 checkpoint with 1.13.2, we'll get a
problem deserializing because (I think, correct me if I'm way off the
mark!) of this in 1.13.2:




private static class NoRescalingDescriptor extends
InflightDataRescalingDescriptor {

private static final long serialVersionUID = 1L;



whereas in 1.13.0 it's


private static class NoRescalingDescriptor extends
InflightDataRescalingDescriptor {
private static final long serialVersionUID = -5544173933105855751L;



Caused by: java.io.InvalidClassException:
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor;
local class incompatible: stream classdesc serialVersionUID =
-5544173933105855751, local class serialVersionUID = 1
	at java.io.ObjectStreamClass.initNonProxy(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readClassDesc(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
	at java.util.HashMap.readObject(Unknown Source) ~[?:?]
	at jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
	at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
	at java.util.HashMap.readObject(Unknown Source) ~[?:?]
	at jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
	at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:593)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:59)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:298)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at <unknown class>.get(Unknown Source) ~[?:?]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at <unknown class>.get(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source) ~[?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
	at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
	at java.lang.Thread.run(Unknown Source) ~[?:?]


I'll open a JIRA if we like - I was wondering if we could revert the UID
change in 1.13.4  cas obviously that's a super important release but i'm
thinking about a few users I know that are looking to upgrade from 1.13.0
(which they're currently on) and as of now, I think this will prove a
hurdle and we'll have to use an alternative means (if possible) so they can
"upgrade" their Flink version with their existing jobs.

https://github.com/apache/flink/commit/8327f4486841cd1d6beb05418e6d4206a6f4858b
this
is the particular commit where we've noticed the serial version ID changing.

We'll be experimenting with a savepoint which will hopefully save the day
when the upgrade  happens, but figured I'd raise it here incase anyone sees
it before or knows why said change was made; I'm unsure as to what the
ideal solution is unfortunately - perhaps a more fine-grained compatibility
matrix will have to be devised (so 1.13.0 and 1.13.0 is fine, 1.13.1 and
1.13.2 and 1.13.3 is fine, and then anything onwards should be fine...you
just can't go from 1.13.0 unless you use savepoints?).

Many thanks as always,

Re: Unable to restore from checkpoint (Flink 1.13.2 and 1.13.0): serial version has changed in InflightDataRescalingDescriptor

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Adam,

We support version upgrades only through a savepoint. We do not support
rolling upgrades or upgrades via a checkpoint store. Please also take a
look at this issue, which has been created before[1].

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-24621

On 14/12/2021 17:16, Adam Roberts wrote:
> Hi everyone,
>
> I'm aware of the compatibility matrix for Flink which is available here
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/ -
> which suggests 1.13.x and 1.13.x should work just fine.
>
> However, if we try to restore a 1.13.0 checkpoint with 1.13.2, we'll get a
> problem deserializing because (I think, correct me if I'm way off the
> mark!) of this in 1.13.2:
>
>
>
>
> private static class NoRescalingDescriptor extends
> InflightDataRescalingDescriptor {
>
> private static final long serialVersionUID = 1L;
>
>
>
> whereas in 1.13.0 it's
>
>
> private static class NoRescalingDescriptor extends
> InflightDataRescalingDescriptor {
> private static final long serialVersionUID = -5544173933105855751L;
>
>
>
> Caused by: java.io.InvalidClassException:
> org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor;
> local class incompatible: stream classdesc serialVersionUID =
> -5544173933105855751, local class serialVersionUID = 1
> 	at java.io.ObjectStreamClass.initNonProxy(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readClassDesc(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
> 	at java.util.HashMap.readObject(Unknown Source) ~[?:?]
> 	at jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source) ~[?:?]
> 	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
> 	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
> 	at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
> 	at java.util.HashMap.readObject(Unknown Source) ~[?:?]
> 	at jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source) ~[?:?]
> 	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
> 	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
> 	at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
> 	at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:593)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:59)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:298)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at <unknown class>.get(Unknown Source) ~[?:?]
> 	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at <unknown class>.get(Unknown Source) ~[?:?]
> 	at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source) ~[?:?]
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> 	at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) ~[?:?]
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
> 	at java.lang.Thread.run(Unknown Source) ~[?:?]
>
>
> I'll open a JIRA if we like - I was wondering if we could revert the UID
> change in 1.13.4  cas obviously that's a super important release but i'm
> thinking about a few users I know that are looking to upgrade from 1.13.0
> (which they're currently on) and as of now, I think this will prove a
> hurdle and we'll have to use an alternative means (if possible) so they can
> "upgrade" their Flink version with their existing jobs.
>
> https://github.com/apache/flink/commit/8327f4486841cd1d6beb05418e6d4206a6f4858b
> this
> is the particular commit where we've noticed the serial version ID changing.
>
> We'll be experimenting with a savepoint which will hopefully save the day
> when the upgrade  happens, but figured I'd raise it here incase anyone sees
> it before or knows why said change was made; I'm unsure as to what the
> ideal solution is unfortunately - perhaps a more fine-grained compatibility
> matrix will have to be devised (so 1.13.0 and 1.13.0 is fine, 1.13.1 and
> 1.13.2 and 1.13.3 is fine, and then anything onwards should be fine...you
> just can't go from 1.13.0 unless you use savepoints?).
>
> Many thanks as always,
>