You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (JIRA)" <ji...@apache.org> on 2018/11/01 14:05:00 UTC

[jira] [Comment Edited] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

    [ https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671643#comment-16671643 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-10493 at 11/1/18 2:04 PM:
----------------------------------------------------------------------

[~elevy] [~aljoscha]

This issue is exactly what the rework of how serializer snapshotting and restoring works in 1.7 is targeted for.

To briefly put this, before 1.7, serializers were written via Java serialization alongside the serializer snapshots, and is therefore error prone to Java serialization problems such as the one described here in the ticket.
After 1.7, once a serializer's snapshot has been upgraded to the new interface `TypeSerialzerSnapshot`, the serializer will no longer be written. Instead, on restore, the snapshot of the prior serialized will be used as a factory to reinstantiate the prior serializer.

Unfortunately, in the upcoming 1.7, we haven't upgraded the Scala serializers' snapshots to the new interfaces yet, meaning that in 1.7 for the Scala serializers we will still be relying on Java serialization to obtain prior serializers, and the issue here still remains.

I think upgrading the Scala serializers (alongside other ones such as Pojo) will be a high priority for Flink 1.8, so we should expect that this ticket to be fixed by then.


was (Author: tzulitai):
[~elevy] [~aljoscha]

This issue is exactly what the rework of how serializer snapshotting and restoring works in 1.7 is targeted for.

To briefly put this, before 1.7, serializers were written via Java serialization alongside the serializer snapshots, and is therefore error prone to Java serialization problems such as the one described here in the ticket.
After 1.7, once a serializer's snapshot has been upgraded to the new interface, the serializer will no longer be written. Instead, on restore, the snapshot of the prior serialized will be used as a factory to reinstantiate the prior serializer.

Unfortunately, in the upcoming 1.7, we haven't upgraded the Scala serializers' snapshots to the new interfaces yet, meaning that in 1.7 for the Scala serializers we will still be relying on Java serialization to obtain prior serializers, and the issue here still remains.

I think upgrading the Scala serializers (alongside other ones such as Pojo) will be a high priority for Flink 1.8, so we should expect that this ticket to be fixed by then.

> Macro generated CaseClassSerializer considered harmful
> ------------------------------------------------------
>
>                 Key: FLINK-10493
>                 URL: https://issues.apache.org/jira/browse/FLINK-10493
>             Project: Flink
>          Issue Type: Bug
>          Components: Scala API, State Backends, Checkpointing, Type Serialization System
>    Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 1.6.1
>            Reporter: Elias Levy
>            Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} and {{TypeSerializer}} objects for types.  In the case of Scala tuple and case classes, the macro generates an [anonymous {{CaseClassSerializer}} class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].  
> The Scala compiler will generate a name for the anonymous class that depends on the relative position in the code of the macro invocation to other anonymous classes.  If the code is changed such that the anonymous class relative position changes, even if the overall logic of the code or the type in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored from a savepoint, as the serializer to read the data in the savepoint will no longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation about this issue.  Minor code changes can result in jobs that can't restore previous state.  Ideally, the use of anonymous classes should be deprecated if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> 	at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> 	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> 	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> 	at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> 	at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> 	at java.io.ObjectInputStream.readObject0(Unknown Source)
> 	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> 	at java.io.ObjectInputStream.readSerialData(Unknown Source)
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> 	at java.io.ObjectInputStream.readObject0(Unknown Source)
> 	at java.io.ObjectInputStream.readObject(Unknown Source)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> 	... 14 more
> Caused by: java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
> 	at java.net.URLClassLoader.findClass(Unknown Source)
> 	at java.lang.ClassLoader.loadClass(Unknown Source)
> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> 	at java.lang.ClassLoader.loadClass(Unknown Source)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Unknown Source)
> 	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
> 	at org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
> 	... 24 more
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> 	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
> 	at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> 	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> 	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> 	at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> 	at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> 	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> 	at java.io.ObjectInputStream.readObject0(Unknown Source)
> 	at java.io.ObjectInputStream.readObject(Unknown Source)
> 	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> 	... 18 more
> Caused by: java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
> 	at java.net.URLClassLoader.findClass(Unknown Source)
> 	at java.lang.ClassLoader.loadClass(Unknown Source)
> 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> 	at java.lang.ClassLoader.loadClass(Unknown Source)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Unknown Source)
> 	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
> 	at org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
> 	... 24 more
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
> java.lang.NullPointerException
> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
> 	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Unknown Source)
> INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Kafka Topic -> Async Function (1/1) (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize operator state backend.
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.IOException: Unable to restore operator state [_async_wait_operator_state_]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.
> 	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> 	... 6 more
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)