You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2021/02/08 13:42:53 UTC

Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

Hi,

I have an original job (say v1) and I want to start a new job (say v2) from
a savepoint of v1.

An operator of v1 used to have per-key states of a POJO type, but I want to
remove the states together with the definition of the POJO type.

When I start v2 from a savepoint of v1, I specified
"--allowNonRestoredState" but  I got the following exception:

2021-02-08 22:07:28,324 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @
mobdata-flink-dn29.dakao.io (dataPort=45505).
java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
Caused by: org.apache.flink.util.FlinkException: Could not restore
keyed state backend for
CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8)
from any of the 1 provided restore options.
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException:
Caught unexpected exception.
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	... 9 more
Caused by: java.io.IOException: Could not find class
'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in
classpath.
	at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	... 9 more
Caused by: java.lang.ClassNotFoundException:
com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_222]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222]
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222]
	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
	at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
	at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1]

 ... 9 more


So I make two versions of v2:
1) v2_1 : remove only key states w/o removing the definition of the POJO
type. I can manage to resume from a savepoint of v1.
2) v2_2 : remove both key states and the definition of the POJO type. I
hope resuming from a savepoint of v2_1 could succeed but it fails with the
same exception as above.

Q1) Why doesn't the "--allowNonRestoredState" option suppress
ClassNotFoundException?
Q2) Do I have to live forever with the definition of the POJO type which is
no longer necessary?

Best,

Dongwon

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Dongwon,

With State Processor API you should be able to create a new snapshot that
doesn't reference the unused classes.

Regards,
Roman


On Tue, Feb 9, 2021 at 3:39 AM Dongwon Kim <ea...@gmail.com> wrote:

> Hi Khachatryan,
>
> Thanks for the explanation and the input!
>
> 1. Use the State Processor API to create a new snapshot [1]
>
> I haven't used it. but does the API prevent the class of a specific
> serializer from being loaded?
>
> 2. If the operator has only this state then changing uid (together with
>> allowNonRestoredState) should help
>
> Very unfortunately, I have another per-key state defined on the operator
> which is very important and cannot be abandoned T.T
>
> 3. Probably just changing POJO to an empty class will suffice in your case?
>
> Yeah, I might be bringing the class definition for a while.
>
> Best,
>
> Dongwon
>
>
> On Tue, Feb 9, 2021 at 2:35 AM Khachatryan Roman <
> khachatryan.roman@gmail.com> wrote:
>
>> Hi,
>>
>> I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in
>> particular.
>>
>> From what I see, the 2nd snapshot (sp2) is built using the same set of
>> states obtained from the starting savepoint/checkpoint (sp1) to write its
>> metadata. This metadata includes serializers snapshots, including
>> PojoSerializer for your custom type. On restore, this metadata is read, and
>> POJO class itself is loaded.
>>
>> I see the following ways to overcome this issue:
>> 1. Use the State Processor API to create a new snapshot [1]
>> 2. If the operator has only this state then changing uid (together with
>> allowNonRestoredState) should help
>> 3. Probably just changing POJO to an empty class will suffice in your
>> case?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim <ea...@gmail.com> wrote:
>>
>>> Hi 张静,
>>>
>>>     Q1: By default, a savepoint restore will try to match all state
>>>> back to the restored job. `AllowNonRestoredState` cannot avoid
>>>> recovery all state from savepoint, but only skip match all of the
>>>> restore state back to the restored job. So `ClassNotFoundException `
>>>> could not be avoid.
>>>
>>> okay
>>>
>>>    Q2: Not really. After you recover new job from the savepoint
>>>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>>>> then remove the definition of the POJO type. then you can restore from
>>>> savepoint2.
>>>>
>>> I did it but it ends up with the same ClassNotFoundException :-(
>>>
>>> What I did exactly are
>>> (1) Trigger sp1 from v1
>>> (2) Start v2-1 (w/ the definition of the POJO but do not use it at all)
>>> from sp1
>>> (3) Trigger sp2 from v2-1
>>> (4) Start v2-2 (w/o the definition of the POJO)  from sp2
>>> (5) v2-2 failed with the same ClassNotFoundException regarding the POJO
>>> type
>>>
>>> Should v2-2 successfully start from sp2?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Feb 8, 2021 at 11:48 PM 张静 <be...@gmail.com> wrote:
>>>
>>>> Hi, Dongwon,
>>>>      Q1: By default, a savepoint restore will try to match all state
>>>> back to the restored job. `AllowNonRestoredState` cannot avoid
>>>> recovery all state from savepoint, but only skip match all of the
>>>> restore state back to the restored job. So `ClassNotFoundException `
>>>> could not be avoid.
>>>>      Q2: Not really. After you recover new job from the savepoint
>>>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>>>> then remove the definition of the POJO type. then you can restore from
>>>> savepoint2.
>>>> Correct me please if I'm wrong. Thanks.
>>>>
>>>> Best,
>>>> Beyond1920
>>>>
>>>> Dongwon Kim <ea...@gmail.com> 于2021年2月8日周一 下午9:43写道:
>>>> >
>>>> > Hi,
>>>> >
>>>> > I have an original job (say v1) and I want to start a new job (say
>>>> v2) from a savepoint of v1.
>>>> >
>>>> > An operator of v1 used to have per-key states of a POJO type, but I
>>>> want to remove the states together with the definition of the POJO type.
>>>> >
>>>> > When I start v2 from a savepoint of v1, I specified
>>>> "--allowNonRestoredState" but  I got the following exception:
>>>> >
>>>> > 2021-02-08 22:07:28,324 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>>>> input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
>>>> RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @
>>>> mobdata-flink-dn29.dakao.io (dataPort=45505).
>>>> > java.lang.Exception: Exception while creating
>>>> StreamOperatorStateContext.
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
>>>> > Caused by: org.apache.flink.util.FlinkException: Could not restore
>>>> keyed state backend for
>>>> CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from
>>>> any of the 1 provided restore options.
>>>> > at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > ... 9 more
>>>> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>>> Caught unexpected exception.
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > ... 9 more
>>>> > Caused by: java.io.IOException: Could not find class
>>>> 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in
>>>> classpath.
>>>> > at
>>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > ... 9 more
>>>> > Caused by: java.lang.ClassNotFoundException:
>>>> com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
>>>> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>> ~[?:1.8.0_222]
>>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> ~[?:1.8.0_222]
>>>> > at
>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> ~[?:1.8.0_222]
>>>> > at
>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
>>>> > at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
>>>> > at
>>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> > at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>>> >
>>>> >  ... 9 more
>>>> >
>>>> >
>>>> > So I make two versions of v2:
>>>> > 1) v2_1 : remove only key states w/o removing the definition of the
>>>> POJO type. I can manage to resume from a savepoint of v1.
>>>> > 2) v2_2 : remove both key states and the definition of the POJO type.
>>>> I hope resuming from a savepoint of v2_1 could succeed but it fails with
>>>> the same exception as above.
>>>> >
>>>> > Q1) Why doesn't the "--allowNonRestoredState" option suppress
>>>> ClassNotFoundException?
>>>> > Q2) Do I have to live forever with the definition of the POJO type
>>>> which is no longer necessary?
>>>> >
>>>> > Best,
>>>> >
>>>> > Dongwon
>>>> >
>>>>
>>>

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Khachatryan,

Thanks for the explanation and the input!

1. Use the State Processor API to create a new snapshot [1]

I haven't used it. but does the API prevent the class of a specific
serializer from being loaded?

2. If the operator has only this state then changing uid (together with
> allowNonRestoredState) should help

Very unfortunately, I have another per-key state defined on the operator
which is very important and cannot be abandoned T.T

3. Probably just changing POJO to an empty class will suffice in your case?

Yeah, I might be bringing the class definition for a while.

Best,

Dongwon


On Tue, Feb 9, 2021 at 2:35 AM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> Hi,
>
> I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in
> particular.
>
> From what I see, the 2nd snapshot (sp2) is built using the same set of
> states obtained from the starting savepoint/checkpoint (sp1) to write its
> metadata. This metadata includes serializers snapshots, including
> PojoSerializer for your custom type. On restore, this metadata is read, and
> POJO class itself is loaded.
>
> I see the following ways to overcome this issue:
> 1. Use the State Processor API to create a new snapshot [1]
> 2. If the operator has only this state then changing uid (together with
> allowNonRestoredState) should help
> 3. Probably just changing POJO to an empty class will suffice in your case?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi 张静,
>>
>>     Q1: By default, a savepoint restore will try to match all state
>>> back to the restored job. `AllowNonRestoredState` cannot avoid
>>> recovery all state from savepoint, but only skip match all of the
>>> restore state back to the restored job. So `ClassNotFoundException `
>>> could not be avoid.
>>
>> okay
>>
>>    Q2: Not really. After you recover new job from the savepoint
>>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>>> then remove the definition of the POJO type. then you can restore from
>>> savepoint2.
>>>
>> I did it but it ends up with the same ClassNotFoundException :-(
>>
>> What I did exactly are
>> (1) Trigger sp1 from v1
>> (2) Start v2-1 (w/ the definition of the POJO but do not use it at all)
>> from sp1
>> (3) Trigger sp2 from v2-1
>> (4) Start v2-2 (w/o the definition of the POJO)  from sp2
>> (5) v2-2 failed with the same ClassNotFoundException regarding the POJO
>> type
>>
>> Should v2-2 successfully start from sp2?
>>
>> Best,
>>
>> Dongwon
>>
>>
>>
>>
>>
>>
>> On Mon, Feb 8, 2021 at 11:48 PM 张静 <be...@gmail.com> wrote:
>>
>>> Hi, Dongwon,
>>>      Q1: By default, a savepoint restore will try to match all state
>>> back to the restored job. `AllowNonRestoredState` cannot avoid
>>> recovery all state from savepoint, but only skip match all of the
>>> restore state back to the restored job. So `ClassNotFoundException `
>>> could not be avoid.
>>>      Q2: Not really. After you recover new job from the savepoint
>>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>>> then remove the definition of the POJO type. then you can restore from
>>> savepoint2.
>>> Correct me please if I'm wrong. Thanks.
>>>
>>> Best,
>>> Beyond1920
>>>
>>> Dongwon Kim <ea...@gmail.com> 于2021年2月8日周一 下午9:43写道:
>>> >
>>> > Hi,
>>> >
>>> > I have an original job (say v1) and I want to start a new job (say v2)
>>> from a savepoint of v1.
>>> >
>>> > An operator of v1 used to have per-key states of a POJO type, but I
>>> want to remove the states together with the definition of the POJO type.
>>> >
>>> > When I start v2 from a savepoint of v1, I specified
>>> "--allowNonRestoredState" but  I got the following exception:
>>> >
>>> > 2021-02-08 22:07:28,324 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>>> input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
>>> RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @
>>> mobdata-flink-dn29.dakao.io (dataPort=45505).
>>> > java.lang.Exception: Exception while creating
>>> StreamOperatorStateContext.
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
>>> > Caused by: org.apache.flink.util.FlinkException: Could not restore
>>> keyed state backend for
>>> CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from
>>> any of the 1 provided restore options.
>>> > at
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > ... 9 more
>>> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>> Caught unexpected exception.
>>> > at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > ... 9 more
>>> > Caused by: java.io.IOException: Could not find class
>>> 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in
>>> classpath.
>>> > at
>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > ... 9 more
>>> > Caused by: java.lang.ClassNotFoundException:
>>> com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
>>> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>> ~[?:1.8.0_222]
>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222]
>>> > at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222]
>>> > at
>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
>>> > at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
>>> > at
>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> >
>>> >  ... 9 more
>>> >
>>> >
>>> > So I make two versions of v2:
>>> > 1) v2_1 : remove only key states w/o removing the definition of the
>>> POJO type. I can manage to resume from a savepoint of v1.
>>> > 2) v2_2 : remove both key states and the definition of the POJO type.
>>> I hope resuming from a savepoint of v2_1 could succeed but it fails with
>>> the same exception as above.
>>> >
>>> > Q1) Why doesn't the "--allowNonRestoredState" option suppress
>>> ClassNotFoundException?
>>> > Q2) Do I have to live forever with the definition of the POJO type
>>> which is no longer necessary?
>>> >
>>> > Best,
>>> >
>>> > Dongwon
>>> >
>>>
>>

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi,

I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in
particular.

From what I see, the 2nd snapshot (sp2) is built using the same set of
states obtained from the starting savepoint/checkpoint (sp1) to write its
metadata. This metadata includes serializers snapshots, including
PojoSerializer for your custom type. On restore, this metadata is read, and
POJO class itself is loaded.

I see the following ways to overcome this issue:
1. Use the State Processor API to create a new snapshot [1]
2. If the operator has only this state then changing uid (together with
allowNonRestoredState) should help
3. Probably just changing POJO to an empty class will suffice in your case?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Regards,
Roman


On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim <ea...@gmail.com> wrote:

> Hi 张静,
>
>     Q1: By default, a savepoint restore will try to match all state
>> back to the restored job. `AllowNonRestoredState` cannot avoid
>> recovery all state from savepoint, but only skip match all of the
>> restore state back to the restored job. So `ClassNotFoundException `
>> could not be avoid.
>
> okay
>
>    Q2: Not really. After you recover new job from the savepoint
>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>> then remove the definition of the POJO type. then you can restore from
>> savepoint2.
>>
> I did it but it ends up with the same ClassNotFoundException :-(
>
> What I did exactly are
> (1) Trigger sp1 from v1
> (2) Start v2-1 (w/ the definition of the POJO but do not use it at all)
> from sp1
> (3) Trigger sp2 from v2-1
> (4) Start v2-2 (w/o the definition of the POJO)  from sp2
> (5) v2-2 failed with the same ClassNotFoundException regarding the POJO
> type
>
> Should v2-2 successfully start from sp2?
>
> Best,
>
> Dongwon
>
>
>
>
>
>
> On Mon, Feb 8, 2021 at 11:48 PM 张静 <be...@gmail.com> wrote:
>
>> Hi, Dongwon,
>>      Q1: By default, a savepoint restore will try to match all state
>> back to the restored job. `AllowNonRestoredState` cannot avoid
>> recovery all state from savepoint, but only skip match all of the
>> restore state back to the restored job. So `ClassNotFoundException `
>> could not be avoid.
>>      Q2: Not really. After you recover new job from the savepoint
>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>> then remove the definition of the POJO type. then you can restore from
>> savepoint2.
>> Correct me please if I'm wrong. Thanks.
>>
>> Best,
>> Beyond1920
>>
>> Dongwon Kim <ea...@gmail.com> 于2021年2月8日周一 下午9:43写道:
>> >
>> > Hi,
>> >
>> > I have an original job (say v1) and I want to start a new job (say v2)
>> from a savepoint of v1.
>> >
>> > An operator of v1 used to have per-key states of a POJO type, but I
>> want to remove the states together with the definition of the POJO type.
>> >
>> > When I start v2 from a savepoint of v1, I specified
>> "--allowNonRestoredState" but  I got the following exception:
>> >
>> > 2021-02-08 22:07:28,324 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>> input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
>> RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @
>> mobdata-flink-dn29.dakao.io (dataPort=45505).
>> > java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
>> > Caused by: org.apache.flink.util.FlinkException: Could not restore
>> keyed state backend for
>> CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from
>> any of the 1 provided restore options.
>> > at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > ... 9 more
>> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>> Caught unexpected exception.
>> > at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > ... 9 more
>> > Caused by: java.io.IOException: Could not find class
>> 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in
>> classpath.
>> > at
>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > ... 9 more
>> > Caused by: java.lang.ClassNotFoundException:
>> com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
>> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> ~[?:1.8.0_222]
>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222]
>> > at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222]
>> > at
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
>> > at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
>> > at
>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> >
>> >  ... 9 more
>> >
>> >
>> > So I make two versions of v2:
>> > 1) v2_1 : remove only key states w/o removing the definition of the
>> POJO type. I can manage to resume from a savepoint of v1.
>> > 2) v2_2 : remove both key states and the definition of the POJO type. I
>> hope resuming from a savepoint of v2_1 could succeed but it fails with the
>> same exception as above.
>> >
>> > Q1) Why doesn't the "--allowNonRestoredState" option suppress
>> ClassNotFoundException?
>> > Q2) Do I have to live forever with the definition of the POJO type
>> which is no longer necessary?
>> >
>> > Best,
>> >
>> > Dongwon
>> >
>>
>

Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

Posted by Dongwon Kim <ea...@gmail.com>.
Hi 张静,

    Q1: By default, a savepoint restore will try to match all state
> back to the restored job. `AllowNonRestoredState` cannot avoid
> recovery all state from savepoint, but only skip match all of the
> restore state back to the restored job. So `ClassNotFoundException `
> could not be avoid.

okay

   Q2: Not really. After you recover new job from the savepoint
> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
> then remove the definition of the POJO type. then you can restore from
> savepoint2.
>
I did it but it ends up with the same ClassNotFoundException :-(

What I did exactly are
(1) Trigger sp1 from v1
(2) Start v2-1 (w/ the definition of the POJO but do not use it at all)
from sp1
(3) Trigger sp2 from v2-1
(4) Start v2-2 (w/o the definition of the POJO)  from sp2
(5) v2-2 failed with the same ClassNotFoundException regarding the POJO type

Should v2-2 successfully start from sp2?

Best,

Dongwon






On Mon, Feb 8, 2021 at 11:48 PM 张静 <be...@gmail.com> wrote:

> Hi, Dongwon,
>      Q1: By default, a savepoint restore will try to match all state
> back to the restored job. `AllowNonRestoredState` cannot avoid
> recovery all state from savepoint, but only skip match all of the
> restore state back to the restored job. So `ClassNotFoundException `
> could not be avoid.
>      Q2: Not really. After you recover new job from the savepoint
> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
> then remove the definition of the POJO type. then you can restore from
> savepoint2.
> Correct me please if I'm wrong. Thanks.
>
> Best,
> Beyond1920
>
> Dongwon Kim <ea...@gmail.com> 于2021年2月8日周一 下午9:43写道:
> >
> > Hi,
> >
> > I have an original job (say v1) and I want to start a new job (say v2)
> from a savepoint of v1.
> >
> > An operator of v1 used to have per-key states of a POJO type, but I want
> to remove the states together with the definition of the POJO type.
> >
> > When I start v2 from a savepoint of v1, I specified
> "--allowNonRestoredState" but  I got the following exception:
> >
> > 2021-02-08 22:07:28,324 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
> RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @
> mobdata-flink-dn29.dakao.io (dataPort=45505).
> > java.lang.Exception: Exception while creating StreamOperatorStateContext.
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
> > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from
> any of the 1 provided restore options.
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > ... 9 more
> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Caught unexpected exception.
> > at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > ... 9 more
> > Caused by: java.io.IOException: Could not find class
> 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in
> classpath.
> > at
> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > ... 9 more
> > Caused by: java.lang.ClassNotFoundException:
> com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> ~[?:1.8.0_222]
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222]
> > at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222]
> > at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
> > at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
> > at
> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> >
> >  ... 9 more
> >
> >
> > So I make two versions of v2:
> > 1) v2_1 : remove only key states w/o removing the definition of the POJO
> type. I can manage to resume from a savepoint of v1.
> > 2) v2_2 : remove both key states and the definition of the POJO type. I
> hope resuming from a savepoint of v2_1 could succeed but it fails with the
> same exception as above.
> >
> > Q1) Why doesn't the "--allowNonRestoredState" option suppress
> ClassNotFoundException?
> > Q2) Do I have to live forever with the definition of the POJO type which
> is no longer necessary?
> >
> > Best,
> >
> > Dongwon
> >
>