You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2018/10/01 23:09:56 UTC

Re: Deserialization of serializer errored

Any of the Flink folks seen this before?

On Fri, Sep 28, 2018 at 5:23 PM Elias Levy <fe...@gmail.com>
wrote:

> I am experiencing a rather odd error.  We have a job running on a Flink
> 1.4.2 cluster with two Kafka input streams, one of the streams is processed
> by an async function, and the output of the async function and the other
> original stream are consumed by a CoProcessOperator, that intern emits
> Scala case class instances, that go into a stateful ProcessFunction filter,
> and then into a sink.  I.e.
>
> source 1 -> async function --\
>                                                |---> co process -->
> process --> sink
> source 2 --------------------------/
>
> A field was added to output case class and the job would no longer start
> up from a save point.  I assumed this was a result of a serializer
> incompatibility.  I verified this by reversing the addition of the field
> and the job could then restore from the previous savepoint.  So far it
> makes sense.
>
> Then I decided to leave the new field in the case class, but eliminated
> most of the DAG, leaving only the source 1 --> async function portion of
> it.  The case class is emitted by the co process.  So this removed the
> modified case class from the processing graph.  When I try to restore from
> the savepoint, even if Allow Non Restored State is selected, the job fails
> to restore with the error "Deserialization of serializer erroed".
>
> So then I decided to completely eliminate the modified case class.  I
> removed all trace of it from the job, again only leaving the source 1 ->
> async function.  I tried to restore this job, with no traces of the case
> class, and still the job failed with the "Deserialization of serializer
> erroed" even when Allow Non Restored State is selected.
>
> Anyone seen anything like this?
>
> This is the error being generated:
>
> WARN
>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
> Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> at
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.readObject(Unknown Source)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> ... 14 more
> Caused by: java.lang.ClassNotFoundException:
> com.somewhere.TestJob$$anon$13$$anon$3
> at java.net.URLClassLoader.findClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Unknown Source)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
> at
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
> ... 24 more
> WARN
>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
> Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> at
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
> at
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.readObject(Unknown Source)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> ... 18 more
> Caused by:
> java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
> at java.net.URLClassLoader.findClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Unknown Source)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
> at
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
> ... 24 more
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           -
> Error during disposal of stream operator.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> INFO  org.apache.flink.runtime.taskmanager.Task                     -
> Source: Kafka Topic -> Async Function (1/1)
> (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize operator state
> backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.IOException: Unable to restore operator state
> [_async_wait_operator_state_]. The previous serializer of the operator
> state must be present; the serializer could have been removed from the
> classpath, or its implementation have changed and could not be loaded. This
> is a temporary restriction that will be fixed in future versions.
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> ... 6 more
>
>
>

Re: Deserialization of serializer errored

Posted by Elias Levy <fe...@gmail.com>.
I am wondering if the issue here is the createTuple2TypeInformation
implicit is creating an anonymous class
<https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala#L97-L110>
which
results in a non-stable class name if the code is refactored, leading to
the class no longer being found by that name in a new version of the job.

On Tue, Oct 2, 2018 at 4:55 PM Elias Levy <fe...@gmail.com>
wrote:

> To add to the mystery, I extracted the class file mentioned in the
> exceptions (TestJob$$anon$13$$anon$3) from the job jar that created the
> savepoint and disassembled it to determine what serializer it is.  The
> serializer actually has nothing to do with the case class that was
> initially modified and then completely removed.  Rather, it was the
> serializer
> org/apache/flink/api/scala/typeutils/CaseClassSerializer<Lscala/Tuple2<Ljava/lang/String;Lme/doubledutch/lazyjson/LazyObject;>;>;
>
> That's the serializer generated by Flink for source 1 data stream type
> (DataStream[ (String, LazyObject) ]), which is consumed by the async
> function.  AFAIK there is no reason for there to be any error with that
> serializer.
>
> Thoughts?
>
>
>
> On Tue, Oct 2, 2018 at 12:41 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Elias,
>>
>> I am not familiar with the recovery code, but Flink might read (some of )
>> the savepoint data even though it is not needed and loaded into operators.
>> That would explain why you see an exception when the case class is
>> modified or completely removed.
>>
>> Maybe Stefan or Gordon can help here.
>>
>> Best, Fabian
>>
>> Am Di., 2. Okt. 2018 um 01:10 Uhr schrieb Elias Levy <
>> fearsome.lucidity@gmail.com>:
>>
>>> Any of the Flink folks seen this before?
>>>
>>> On Fri, Sep 28, 2018 at 5:23 PM Elias Levy <fe...@gmail.com>
>>> wrote:
>>>
>>>> I am experiencing a rather odd error.  We have a job running on a Flink
>>>> 1.4.2 cluster with two Kafka input streams, one of the streams is processed
>>>> by an async function, and the output of the async function and the other
>>>> original stream are consumed by a CoProcessOperator, that intern emits
>>>> Scala case class instances, that go into a stateful ProcessFunction filter,
>>>> and then into a sink.  I.e.
>>>>
>>>> source 1 -> async function --\
>>>>                                                |---> co process -->
>>>> process --> sink
>>>> source 2 --------------------------/
>>>>
>>>> A field was added to output case class and the job would no longer
>>>> start up from a save point.  I assumed this was a result of a serializer
>>>> incompatibility.  I verified this by reversing the addition of the field
>>>> and the job could then restore from the previous savepoint.  So far it
>>>> makes sense.
>>>>
>>>> Then I decided to leave the new field in the case class, but eliminated
>>>> most of the DAG, leaving only the source 1 --> async function portion of
>>>> it.  The case class is emitted by the co process.  So this removed the
>>>> modified case class from the processing graph.  When I try to restore from
>>>> the savepoint, even if Allow Non Restored State is selected, the job fails
>>>> to restore with the error "Deserialization of serializer erroed".
>>>>
>>>> So then I decided to completely eliminate the modified case class.  I
>>>> removed all trace of it from the job, again only leaving the source 1 ->
>>>> async function.  I tried to restore this job, with no traces of the case
>>>> class, and still the job failed with the "Deserialization of serializer
>>>> erroed" even when Allow Non Restored State is selected.
>>>>
>>>> Anyone seen anything like this?
>>>>
>>>> This is the error being generated:
>>>>
>>>> WARN
>>>>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
>>>> Deserialization of serializer errored; replacing with null.
>>>> java.io.IOException: Unloadable class for type serializer.
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>>>> at
>>>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>>>> at
>>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>> at java.lang.Thread.run(Unknown Source)
>>>> Caused by: java.io.InvalidClassException: failed to read class
>>>> descriptor
>>>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>>>> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>>>> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>>>> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>>>> at java.io.ObjectInputStream.readObject(Unknown Source)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>>>> ... 14 more
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> com.somewhere.TestJob$$anon$13$$anon$3
>>>> at java.net.URLClassLoader.findClass(Unknown Source)
>>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>>> at
>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Unknown Source)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>>>> ... 24 more
>>>> WARN
>>>>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
>>>> Deserialization of serializer errored; replacing with null.
>>>> java.io.IOException: Unloadable class for type serializer.
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>>>> at
>>>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
>>>> at
>>>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>>>> at
>>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>> at java.lang.Thread.run(Unknown Source)
>>>> Caused by: java.io.InvalidClassException: failed to read class
>>>> descriptor
>>>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>>>> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>>>> at java.io.ObjectInputStream.readObject(Unknown Source)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>>>> ... 18 more
>>>> Caused by:
>>>> java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
>>>> at java.net.URLClassLoader.findClass(Unknown Source)
>>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>>> at
>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Unknown Source)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>>>> ... 24 more
>>>> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           -
>>>> Error during disposal of stream operator.
>>>> java.lang.NullPointerException
>>>> at
>>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
>>>> at
>>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>> at java.lang.Thread.run(Unknown Source)
>>>> INFO  org.apache.flink.runtime.taskmanager.Task                     -
>>>> Source: Kafka Topic -> Async Function (1/1)
>>>> (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED.
>>>> java.lang.IllegalStateException: Could not initialize operator state
>>>> backend.
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>> at java.lang.Thread.run(Unknown Source)
>>>> Caused by: java.io.IOException: Unable to restore operator state
>>>> [_async_wait_operator_state_]. The previous serializer of the operator
>>>> state must be present; the serializer could have been removed from the
>>>> classpath, or its implementation have changed and could not be loaded. This
>>>> is a temporary restriction that will be fixed in future versions.
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>>>> ... 6 more
>>>>
>>>>
>>>>

Re: Deserialization of serializer errored

Posted by Elias Levy <fe...@gmail.com>.
To add to the mystery, I extracted the class file mentioned in the
exceptions (TestJob$$anon$13$$anon$3) from the job jar that created the
savepoint and disassembled it to determine what serializer it is.  The
serializer actually has nothing to do with the case class that was
initially modified and then completely removed.  Rather, it was the
serializer
org/apache/flink/api/scala/typeutils/CaseClassSerializer<Lscala/Tuple2<Ljava/lang/String;Lme/doubledutch/lazyjson/LazyObject;>;>;

That's the serializer generated by Flink for source 1 data stream type
(DataStream[ (String, LazyObject) ]), which is consumed by the async
function.  AFAIK there is no reason for there to be any error with that
serializer.

Thoughts?



On Tue, Oct 2, 2018 at 12:41 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Elias,
>
> I am not familiar with the recovery code, but Flink might read (some of )
> the savepoint data even though it is not needed and loaded into operators.
> That would explain why you see an exception when the case class is
> modified or completely removed.
>
> Maybe Stefan or Gordon can help here.
>
> Best, Fabian
>
> Am Di., 2. Okt. 2018 um 01:10 Uhr schrieb Elias Levy <
> fearsome.lucidity@gmail.com>:
>
>> Any of the Flink folks seen this before?
>>
>> On Fri, Sep 28, 2018 at 5:23 PM Elias Levy <fe...@gmail.com>
>> wrote:
>>
>>> I am experiencing a rather odd error.  We have a job running on a Flink
>>> 1.4.2 cluster with two Kafka input streams, one of the streams is processed
>>> by an async function, and the output of the async function and the other
>>> original stream are consumed by a CoProcessOperator, that intern emits
>>> Scala case class instances, that go into a stateful ProcessFunction filter,
>>> and then into a sink.  I.e.
>>>
>>> source 1 -> async function --\
>>>                                                |---> co process -->
>>> process --> sink
>>> source 2 --------------------------/
>>>
>>> A field was added to output case class and the job would no longer start
>>> up from a save point.  I assumed this was a result of a serializer
>>> incompatibility.  I verified this by reversing the addition of the field
>>> and the job could then restore from the previous savepoint.  So far it
>>> makes sense.
>>>
>>> Then I decided to leave the new field in the case class, but eliminated
>>> most of the DAG, leaving only the source 1 --> async function portion of
>>> it.  The case class is emitted by the co process.  So this removed the
>>> modified case class from the processing graph.  When I try to restore from
>>> the savepoint, even if Allow Non Restored State is selected, the job fails
>>> to restore with the error "Deserialization of serializer erroed".
>>>
>>> So then I decided to completely eliminate the modified case class.  I
>>> removed all trace of it from the job, again only leaving the source 1 ->
>>> async function.  I tried to restore this job, with no traces of the case
>>> class, and still the job failed with the "Deserialization of serializer
>>> erroed" even when Allow Non Restored State is selected.
>>>
>>> Anyone seen anything like this?
>>>
>>> This is the error being generated:
>>>
>>> WARN
>>>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
>>> Deserialization of serializer errored; replacing with null.
>>> java.io.IOException: Unloadable class for type serializer.
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Unknown Source)
>>> Caused by: java.io.InvalidClassException: failed to read class descriptor
>>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>>> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>>> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>>> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>>> at java.io.ObjectInputStream.readObject(Unknown Source)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>>> ... 14 more
>>> Caused by: java.lang.ClassNotFoundException:
>>> com.somewhere.TestJob$$anon$13$$anon$3
>>> at java.net.URLClassLoader.findClass(Unknown Source)
>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>> at
>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Unknown Source)
>>> at
>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>>> at
>>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>>> ... 24 more
>>> WARN
>>>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
>>> Deserialization of serializer errored; replacing with null.
>>> java.io.IOException: Unloadable class for type serializer.
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>>> at
>>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Unknown Source)
>>> Caused by: java.io.InvalidClassException: failed to read class descriptor
>>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>>> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>>> at java.io.ObjectInputStream.readObject(Unknown Source)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>>> ... 18 more
>>> Caused by:
>>> java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
>>> at java.net.URLClassLoader.findClass(Unknown Source)
>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>> at
>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Unknown Source)
>>> at
>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>>> at
>>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>>> ... 24 more
>>> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           -
>>> Error during disposal of stream operator.
>>> java.lang.NullPointerException
>>> at
>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
>>> at
>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Unknown Source)
>>> INFO  org.apache.flink.runtime.taskmanager.Task                     -
>>> Source: Kafka Topic -> Async Function (1/1)
>>> (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED.
>>> java.lang.IllegalStateException: Could not initialize operator state
>>> backend.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Unknown Source)
>>> Caused by: java.io.IOException: Unable to restore operator state
>>> [_async_wait_operator_state_]. The previous serializer of the operator
>>> state must be present; the serializer could have been removed from the
>>> classpath, or its implementation have changed and could not be loaded. This
>>> is a temporary restriction that will be fixed in future versions.
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>>> ... 6 more
>>>
>>>
>>>

Re: Deserialization of serializer errored

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Elias,

I am not familiar with the recovery code, but Flink might read (some of )
the savepoint data even though it is not needed and loaded into operators.
That would explain why you see an exception when the case class is modified
or completely removed.

Maybe Stefan or Gordon can help here.

Best, Fabian

Am Di., 2. Okt. 2018 um 01:10 Uhr schrieb Elias Levy <
fearsome.lucidity@gmail.com>:

> Any of the Flink folks seen this before?
>
> On Fri, Sep 28, 2018 at 5:23 PM Elias Levy <fe...@gmail.com>
> wrote:
>
>> I am experiencing a rather odd error.  We have a job running on a Flink
>> 1.4.2 cluster with two Kafka input streams, one of the streams is processed
>> by an async function, and the output of the async function and the other
>> original stream are consumed by a CoProcessOperator, that intern emits
>> Scala case class instances, that go into a stateful ProcessFunction filter,
>> and then into a sink.  I.e.
>>
>> source 1 -> async function --\
>>                                                |---> co process -->
>> process --> sink
>> source 2 --------------------------/
>>
>> A field was added to output case class and the job would no longer start
>> up from a save point.  I assumed this was a result of a serializer
>> incompatibility.  I verified this by reversing the addition of the field
>> and the job could then restore from the previous savepoint.  So far it
>> makes sense.
>>
>> Then I decided to leave the new field in the case class, but eliminated
>> most of the DAG, leaving only the source 1 --> async function portion of
>> it.  The case class is emitted by the co process.  So this removed the
>> modified case class from the processing graph.  When I try to restore from
>> the savepoint, even if Allow Non Restored State is selected, the job fails
>> to restore with the error "Deserialization of serializer erroed".
>>
>> So then I decided to completely eliminate the modified case class.  I
>> removed all trace of it from the job, again only leaving the source 1 ->
>> async function.  I tried to restore this job, with no traces of the case
>> class, and still the job failed with the "Deserialization of serializer
>> erroed" even when Allow Non Restored State is selected.
>>
>> Anyone seen anything like this?
>>
>> This is the error being generated:
>>
>> WARN
>>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
>> Deserialization of serializer errored; replacing with null.
>> java.io.IOException: Unloadable class for type serializer.
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>> at
>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>> at
>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Unknown Source)
>> Caused by: java.io.InvalidClassException: failed to read class descriptor
>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>> at java.io.ObjectInputStream.readObject(Unknown Source)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>> ... 14 more
>> Caused by: java.lang.ClassNotFoundException:
>> com.somewhere.TestJob$$anon$13$$anon$3
>> at java.net.URLClassLoader.findClass(Unknown Source)
>> at java.lang.ClassLoader.loadClass(Unknown Source)
>> at
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>> at java.lang.ClassLoader.loadClass(Unknown Source)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Unknown Source)
>> at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>> at
>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>> ... 24 more
>> WARN
>>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
>> Deserialization of serializer errored; replacing with null.
>> java.io.IOException: Unloadable class for type serializer.
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>> at
>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
>> at
>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>> at
>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Unknown Source)
>> Caused by: java.io.InvalidClassException: failed to read class descriptor
>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>> at java.io.ObjectInputStream.readObject(Unknown Source)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>> ... 18 more
>> Caused by:
>> java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
>> at java.net.URLClassLoader.findClass(Unknown Source)
>> at java.lang.ClassLoader.loadClass(Unknown Source)
>> at
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>> at java.lang.ClassLoader.loadClass(Unknown Source)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Unknown Source)
>> at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>> at
>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>> ... 24 more
>> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           -
>> Error during disposal of stream operator.
>> java.lang.NullPointerException
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Unknown Source)
>> INFO  org.apache.flink.runtime.taskmanager.Task                     -
>> Source: Kafka Topic -> Async Function (1/1)
>> (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED.
>> java.lang.IllegalStateException: Could not initialize operator state
>> backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Unknown Source)
>> Caused by: java.io.IOException: Unable to restore operator state
>> [_async_wait_operator_state_]. The previous serializer of the operator
>> state must be present; the serializer could have been removed from the
>> classpath, or its implementation have changed and could not be loaded. This
>> is a temporary restriction that will be fixed in future versions.
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>> ... 6 more
>>
>>
>>