You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juho Autio <ju...@rovio.com> on 2018/03/16 10:19:36 UTC

Adding a new field in java class -> restore fails with "KryoException: Unable to find class"

Is it possible to add new fields to the object type of a stream, and then
restore from savepoint?

I tried to add a new field "private String" to my java class. It previously
had "private String" and a "private final Map<String, String>". When trying
to restore an old savepoint after this code change, it failed with
"KryoException:
Unable to find class".

Is it possible to evolve the stream classes and restore old state after
such changes? For me it would work if the new fields are set to null when
restoring state with such objects. And if a field has been deleted,
restored values could be ignored.

Here's a full stack trace:

2018-03-07 08:49:03,072 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
EnrichIdFunction -> AppIdFilter([appsimulator_236e5fb7]) ->
DiscardBeforeDateFunction(null) -> DiscardedEventsFunction ->
(LateDataLabelFunction, EventMapper -> ThreadPoolGateway (capacity=10) ->
Sink: ResponseKafkaSink) (7/8) (66b47839cefef8518605ece669709c65) switched
from RUNNING to FAILED.
java.lang.IllegalStateException: Could not initialize operator state
backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:330)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:241)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
�mo
Serialization trace:
params (com.rovio.ds.flink.http.Event)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:203)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:552)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:368)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:737)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:328)
... 6 more
Caused by: java.lang.ClassNotFoundException: �mo
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 18 more

I used flink 1.5-SNAPSHOT for this.



FYI, how I solved this for now:

As a work-around for my case, I happened to have a Map<String, String> in
the object that I was able to use as a bit of a hack for this. I inserted
the new field into the map and removed it afterwards to clean it up. This
way I was able to get null values with map.get for objects that had been
restored from the old savepoint, and non-null values for any new instances.



FYI, failed attempt to do this:

Only related things I could find about such thing were (not much help, but
mention the Optional annotation):

https://groups.google.com/forum/#!topic/kryo-users/F0FA4GkDg0M
https://stackoverflow.com/questions/39105113/kryo-deserialize-old-version-of-class

If I set @FieldSerializer.Optional("appId") on the new field, then Flink
was able to restore from the old savepoint, but apparently the new field
got ignored entirely, it just kept being null also for the new instances –
apparently totally ignored then.

Re: Adding a new field in java class -> restore fails with "KryoException: Unable to find class"

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Schema evolution is a bit tricky at the moment. There is a short term and
long term answer to this:

  - Long term: We store serializer configuration in the snapshots, and want
to use this in the future to offer a path that converts old format to new
format (read with old serializer, pass through a user-specified converter
function, serialize with new serializer). Two out of three parts are in
place, but it is not fully working at this point.

  - Short term: To be able to evolve state, I would recommend to use
something like Avro or so, that has schema evolution built in. Kryo is
unfortunately particularly bad at class/schema evolution. To use avro with
your types, when creating your state, pass it a "new
AvroTypeInfo<>(MyClass.class)". You need do add "flink-avro" as a
dependency in your application.

Best,
Stephan


On Fri, Mar 16, 2018 at 11:19 AM, Juho Autio <ju...@rovio.com> wrote:

> Is it possible to add new fields to the object type of a stream, and then
> restore from savepoint?
>
> I tried to add a new field "private String" to my java class. It
> previously had "private String" and a "private final Map<String, String>".
> When trying to restore an old savepoint after this code change, it failed
> with "KryoException: Unable to find class".
>
> Is it possible to evolve the stream classes and restore old state after
> such changes? For me it would work if the new fields are set to null when
> restoring state with such objects. And if a field has been deleted,
> restored values could be ignored.
>
> Here's a full stack trace:
>
> 2018-03-07 08:49:03,072 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       - EnrichIdFunction -> AppIdFilter([appsimulator_236e5fb7]) ->
> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction ->
> (LateDataLabelFunction, EventMapper -> ThreadPoolGateway (capacity=10) ->
> Sink: ResponseKafkaSink) (7/8) (66b47839cefef8518605ece669709c65)
> switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize operator state
> backend.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initOperatorState(AbstractStreamOperator.java:330)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:241)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:676)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:663)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:252)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> �mo
> Serialization trace:
> params (com.rovio.ds.flink.http.Event)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(
> ObjectField.java:99)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.
> read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:250)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:203)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend.
> deserializeStateValues(DefaultOperatorStateBackend.java:552)
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(
> DefaultOperatorStateBackend.java:368)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createOperatorStateBackend(StreamTask.java:737)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initOperatorState(AbstractStreamOperator.java:328)
> ... 6 more
> Caused by: java.lang.ClassNotFoundException: �mo
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:136)
> ... 18 more
>
> I used flink 1.5-SNAPSHOT for this.
>
>
>
> FYI, how I solved this for now:
>
> As a work-around for my case, I happened to have a Map<String, String> in
> the object that I was able to use as a bit of a hack for this. I inserted
> the new field into the map and removed it afterwards to clean it up. This
> way I was able to get null values with map.get for objects that had been
> restored from the old savepoint, and non-null values for any new instances.
>
>
>
> FYI, failed attempt to do this:
>
> Only related things I could find about such thing were (not much help, but
> mention the Optional annotation):
>
> https://groups.google.com/forum/#!topic/kryo-users/F0FA4GkDg0M
> https://stackoverflow.com/questions/39105113/kryo-
> deserialize-old-version-of-class
>
> If I set @FieldSerializer.Optional("appId") on the new field, then Flink
> was able to restore from the old savepoint, but apparently the new field
> got ignored entirely, it just kept being null also for the new instances –
> apparently totally ignored then.
>
>