You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Elias Levy (JIRA)" <ji...@apache.org> on 2018/10/04 19:57:00 UTC

[jira] [Closed] (FLINK-10483) Can't restore from a savepoint even with Allow Non Restored State enabled

     [ https://issues.apache.org/jira/browse/FLINK-10483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Elias Levy closed FLINK-10483.
------------------------------
    Resolution: Invalid

> Can't restore from a savepoint even with Allow Non Restored State enabled
> -------------------------------------------------------------------------
>
>                 Key: FLINK-10483
>                 URL: https://issues.apache.org/jira/browse/FLINK-10483
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing, Type Serialization System
>    Affects Versions: 1.4.2
>            Reporter: Elias Levy
>            Priority: Major
>
> A trimmed streaming job fails a restore from a savepoint with an Unloadable class for type serializer error, even though the case class in question has been eliminated from the job and Allow Non Restored State is enabled.
> We have a job running on a Flink 1.4.2 cluster with two Kafka input streams, one of the streams is processed by an async function, and the output of the async function and the other original stream are consumed by a CoProcessOperator, that intern emits Scala case class instances, that go into a stateful ProcessFunction filter, and then into a sink.  I.e.
> {code:java}
> source 1 -> async function --\
>                                                |---> co process --> process --> sink
> source 2 --------------------------/
> {code}
> I eliminated most of the DAG, leaving only the source 1 --> async function portion of it.  This removed the case class in question from the processing graph.  When I try to restore from the savepoint, even if Allow Non Restored State is selected, the job fails to restore with the error "Deserialization of serializer erroed".
> This is the error being generated:
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> 	at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> 	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> 	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> 	at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> 	at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> 	at java.io.ObjectInputStream.readObject0(Unknown Source)
> 	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> 	at java.io.ObjectInputStream.readSerialData(Unknown Source)
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> 	at java.io.ObjectInputStream.readObject0(Unknown Source)
> 	at java.io.ObjectInputStream.readObject(Unknown Source)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> 	... 14 more
> Caused by: java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
> 	at java.net.URLClassLoader.findClass(Unknown Source)
> 	at java.lang.ClassLoader.loadClass(Unknown Source)
> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> 	at java.lang.ClassLoader.loadClass(Unknown Source)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Unknown Source)
> 	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
> 	at org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
> 	... 24 more
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> 	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
> 	at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> 	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> 	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> 	at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> 	at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> 	at java.io.ObjectInputStream.readObject0(Unknown Source)
> 	at java.io.ObjectInputStream.readObject(Unknown Source)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> 	... 18 more
> Caused by: java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
> 	at java.net.URLClassLoader.findClass(Unknown Source)
> 	at java.lang.ClassLoader.loadClass(Unknown Source)
> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> 	at java.lang.ClassLoader.loadClass(Unknown Source)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Unknown Source)
> 	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
> 	at org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
> 	... 24 more
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
> java.lang.NullPointerException
> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Unknown Source)
> INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Kafka Topic -> Async Function (1/1) (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize operator state backend.
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.IOException: Unable to restore operator state [_async_wait_operator_state_]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.
> 	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> 	... 6 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)