You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2018/07/06 21:31:34 UTC

StateMigrationException when switching from TypeInformation.of to createTypeInformation

During some refactoring we changed a job using managed state from:

ListStateDescriptor("config", TypeInformation.of(new
TypeHint[ConfigState]() {}))

to

ListStateDescriptor("config", createTypeInformation[ConfigState])

After this change, Flink refused to start the new job from a savepoint or
checkpoint, raising StateMigrationException instead.

Why is Flink raising this error?  Both TypeInformation.of and
createTypeInformation return TypeInformation[ConfigState], so why does it
think the state type has changed?

Re: StateMigrationException when switching from TypeInformation.of to createTypeInformation

Posted by Till Rohrmann <tr...@apache.org>.
Hi Elias,

I think introducing a new state and the deprecating the old one is
currently the only way to solve this problem.

The community is currently working on supporting state evolution [1]. With
this feature it should be possible to change serializers between two
savepoints. Unfortunately, the feature could not be completed for Flink
1.6. But I think it will be in the master soon.

[1] https://issues.apache.org/jira/browse/FLINK-9376

Cheers,
Till

On Sun, Jul 15, 2018 at 12:11 AM Elias Levy <fe...@gmail.com>
wrote:

> Apologies for the delay.  I've been traveling.
>
> On Mon, Jul 9, 2018 at 8:44 AM Till Rohrmann <tr...@apache.org> wrote:
>
>> could you check whether the `TypeInformation` returned by
>> `TypeInformation.of(new TypeHint[ConfigState]() {}))` and
>> `createTypeInformation[ConfigState]` return the same `TypeInformation`
>> subtype? The problem is that the former goes through the Java TypeExtractor
>> whereas the latter goes through the Scala `TypeUtils#createTypeInfo` where
>> the resulting `TypeInformation` is created via Scala macros. It must be the
>> case that the Scala `TypeUtils` generate a different `TypeInformation`
>> (e.g. Java generating a GenericTypeInfo whereas Scala generates a
>> TraversableTypeInfo).
>>
>
> TypeInformation.of to returns a GenericTypeInfo and toString reports it
> as GenericType<scala.collection.mutable.Map>.
>
> createTypeInformation returns an anonymous class but toString reports it
> as interface scala.collection.mutable.Map[scala.Tuple2(_1: String, _2:
> scala.Tuple2(_1: GenericType<me.doubledutch.lazyjson.LazyObject>, _2:
> byte[]))].
>
> Looks like you are correct about the Java version using GenericTypeInfo.
> I suppose the only way around this if we wanted to move over to createTypeInformation
> is to release a job that supports both types and upgrade the state from one
> to the other, then drop support for the older state.  Yes?
>
> It would also be helpful if you could share the definition of
>> `ConfigState` in order to test it ourselves.
>>
>
> ConfigState is defined as type ConfigState =
> mutable.Map[String,ConfigStateValue] and ConfigStateValue is defined as type
> ConfigStateValue = (LazyObject,Array[Byte]).  LazyObject is from the
> Doubledutch LazyJSON <https://github.com/doubledutch/LazyJSON> package.
>

Re: StateMigrationException when switching from TypeInformation.of to createTypeInformation

Posted by Elias Levy <fe...@gmail.com>.
Apologies for the delay.  I've been traveling.

On Mon, Jul 9, 2018 at 8:44 AM Till Rohrmann <tr...@apache.org> wrote:

> could you check whether the `TypeInformation` returned by
> `TypeInformation.of(new TypeHint[ConfigState]() {}))` and
> `createTypeInformation[ConfigState]` return the same `TypeInformation`
> subtype? The problem is that the former goes through the Java TypeExtractor
> whereas the latter goes through the Scala `TypeUtils#createTypeInfo` where
> the resulting `TypeInformation` is created via Scala macros. It must be the
> case that the Scala `TypeUtils` generate a different `TypeInformation`
> (e.g. Java generating a GenericTypeInfo whereas Scala generates a
> TraversableTypeInfo).
>

TypeInformation.of to returns a GenericTypeInfo and toString reports it as
GenericType<scala.collection.mutable.Map>.

createTypeInformation returns an anonymous class but toString reports it as
interface scala.collection.mutable.Map[scala.Tuple2(_1: String, _2:
scala.Tuple2(_1: GenericType<me.doubledutch.lazyjson.LazyObject>, _2:
byte[]))].

Looks like you are correct about the Java version using GenericTypeInfo.  I
suppose the only way around this if we wanted to move over to
createTypeInformation
is to release a job that supports both types and upgrade the state from one
to the other, then drop support for the older state.  Yes?

It would also be helpful if you could share the definition of `ConfigState`
> in order to test it ourselves.
>

ConfigState is defined as type ConfigState =
mutable.Map[String,ConfigStateValue] and ConfigStateValue is defined as type
ConfigStateValue = (LazyObject,Array[Byte]).  LazyObject is from the
Doubledutch LazyJSON <https://github.com/doubledutch/LazyJSON> package.

Re: StateMigrationException when switching from TypeInformation.of to createTypeInformation

Posted by Till Rohrmann <tr...@apache.org>.
Hi Elias,

could you check whether the `TypeInformation` returned by
`TypeInformation.of(new TypeHint[ConfigState]() {}))` and
`createTypeInformation[ConfigState]` return the same `TypeInformation`
subtype? The problem is that the former goes through the Java TypeExtractor
whereas the latter goes through the Scala `TypeUtils#createTypeInfo` where
the resulting `TypeInformation` is created via Scala macros. It must be the
case that the Scala `TypeUtils` generate a different `TypeInformation`
(e.g. Java generating a GenericTypeInfo whereas Scala generates a
TraversableTypeInfo).

It would also be helpful if you could share the definition of `ConfigState`
in order to test it ourselves.

Cheers,
Till

On Fri, Jul 6, 2018 at 11:31 PM Elias Levy <fe...@gmail.com>
wrote:

> During some refactoring we changed a job using managed state from:
>
> ListStateDescriptor("config", TypeInformation.of(new
> TypeHint[ConfigState]() {}))
>
> to
>
> ListStateDescriptor("config", createTypeInformation[ConfigState])
>
> After this change, Flink refused to start the new job from a savepoint or
> checkpoint, raising StateMigrationException instead.
>
> Why is Flink raising this error?  Both TypeInformation.of and
> createTypeInformation return TypeInformation[ConfigState], so why does it
> think the state type has changed?
>