You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by kla <la...@gmail.com> on 2018/03/13 14:36:43 UTC

State serialization problem when we add a new field in the object

Hi guys,

I have the flink streaming job running (1.2.0 version) which has the
following state:

private transient ValueState<Map&lt;String, Set&lt;User>>> userState;

With following configuration:

final ValueStateDescriptor<Map&lt;String, Set&lt;User>>> descriptor =
                new ValueStateDescriptor<>("userState",
TypeInformation.of(new UserTypeHint()));
        userState = getRuntimeContext().getState(descriptor);
And the User class has following:

public class User {

    private String id;

    private String firstName;

    private String lastName;

}

And after some time we tried to add one more field in the user object. (for
example emailAddress). But apparently I didn't work, I am getting following
exception:

018-03-13 13:26:13,357 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job CountJob
(cbada55d435571e8b24313196204f8ab) switched from state RUNNING to FAILING.
com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
Serialization trace:
type (com.example.User)
	at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
	at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
	at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:82)
	at
com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:33)
	at
com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:27)
	at
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
	at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242)
	at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
	at java.util.ArrayList.rangeCheck(ArrayList.java:653)
	at java.util.ArrayList.get(ArrayList.java:429)
	at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
	at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
	at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
	... 15 more


Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: State serialization problem when we add a new field in the object

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Konstantin,

What you could do, is that you write and intermediate job that has the old ValueState “oldState” 
and the new one “newState”, with the new format. 

When an element comes in this intermediate job, you check the oldState if it is empty for that key or not. 
If it is null (empty), you simply process the element as it is the first time you see the key. 
If it is not empty, then you implement your migration logic that ports the oldState to the newState format,
you store the migrated state in the newState, and delete it from the oldState. 
Of course after the migration you process the element as usual, but only use the new state.

If at some point you are sure that you have seen all the keys from the previous version of the code, 
then at that point you can be sure that all the old-format states have been migrated and you can take 
a savepoint, clean up the job from the migration logic, and resume from the savepoint with 
the new code. 

If there is no such point where you can be sure that you have migrated the state for all keys, then you 
just your job run like this, i.e. with the migration logic. 

The problem with the above strategy is that in the case that you do not have a point where you can be sure 
that you have seen all keys, if you want to migrate once again in the future, you will have to implement
the same thing but migrating from two different previous versions. But at that point you may have a policy 
that says that if I have not seen a key for the last week or month, then I do not consider active and i do not 
care about it. 

I hope this helps!

Cheers,
Kostas

> On Mar 14, 2018, at 10:03 AM, kla <la...@gmail.com> wrote:
> 
> Hi Aljoscha,
> 
> Thanks for your reply.
> 
> Do you have a suggestion how can we workaround it ?
> 
> We have a production system running with Flink and it is mandatory to add
> one more field in the state.
> 
> Maybe some how we can write our own serializer?
> 
> Thanks,
> Konstantin
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: State serialization problem when we add a new field in the object

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Flink supports upgrading of serializers [1] [2] since version 1.3.
You probably need to upgrade to Flink 1.3 before you can use the feature.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/custom_serialization.html
[2] https://issues.apache.org/jira/browse/FLINK-6178

2018-03-14 10:03 GMT+01:00 kla <la...@gmail.com>:

> Hi Aljoscha,
>
> Thanks for your reply.
>
> Do you have a suggestion how can we workaround it ?
>
> We have a production system running with Flink and it is mandatory to add
> one more field in the state.
>
> Maybe some how we can write our own serializer?
>
> Thanks,
> Konstantin
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: State serialization problem when we add a new field in the object

Posted by kla <la...@gmail.com>.
Hi Aljoscha,

Thanks for your reply.

Do you have a suggestion how can we workaround it ?

We have a production system running with Flink and it is mandatory to add
one more field in the state.

Maybe some how we can write our own serializer?

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: State serialization problem when we add a new field in the object

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I'm afraid Flink does currently not support changing the schema of state when restoring from a savepoint.

Best,
Aljoscha

> On 13. Mar 2018, at 07:36, kla <la...@gmail.com> wrote:
> 
> Hi guys,
> 
> I have the flink streaming job running (1.2.0 version) which has the
> following state:
> 
> private transient ValueState<Map&lt;String, Set&lt;User>>> userState;
> 
> With following configuration:
> 
> final ValueStateDescriptor<Map&lt;String, Set&lt;User>>> descriptor =
>                new ValueStateDescriptor<>("userState",
> TypeInformation.of(new UserTypeHint()));
>        userState = getRuntimeContext().getState(descriptor);
> And the User class has following:
> 
> public class User {
> 
>    private String id;
> 
>    private String firstName;
> 
>    private String lastName;
> 
> }
> 
> And after some time we tried to add one more field in the user object. (for
> example emailAddress). But apparently I didn't work, I am getting following
> exception:
> 
> 018-03-13 13:26:13,357 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job CountJob
> (cbada55d435571e8b24313196204f8ab) switched from state RUNNING to FAILING.
> com.esotericsoftware.kryo.KryoException:
> java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
> Serialization trace:
> type (com.example.User)
> 	at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> 	at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> 	at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
> 	at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> 	at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
> 	at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:82)
> 	at
> com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:33)
> 	at
> com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:27)
> 	at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
> 	at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242)
> 	at
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
> 	at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> 	at java.util.ArrayList.get(ArrayList.java:429)
> 	at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> 	at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
> 	at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
> 	... 15 more
> 
> 
> Thanks,
> Konstantin
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/