You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Petter Arvidsson <pe...@relayr.io> on 2018/04/18 09:00:41 UTC

Managing state migrations with Flink and Avro

Hello everyone,

I am trying to figure out how to set up Flink with Avro for state
management (especially the content of snapshots) to enable state migrations
(e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
tried to explicitly provide an instance of "new
AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple
Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then try to load
the snapshot with an updated schema (adding the nullable field) it fails.
Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"},
     {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the following
exception:
2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.typeutils.
TypeSerializerSerializationUtil  - Deserialization of serializer errored;
replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
local class incompatible: stream classdesc serialVersionUID =
-3555733236161157838, local class serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for the bean,
I just didn't expect it to be used and expected the Avro schema to be used
instead? Did anyone get this working? What am I getting wrong?

Best regards,
Petter

Re: Managing state migrations with Flink and Avro

Posted by Petter Arvidsson <pe...@relayr.io>.
Hi Timo,

Thanks for your response. We are using the filesystem backend backed by S3.

We were looking for a good long term solution with Avro, so manually
changing the serial version id is probably not the right way to proceed for
us. I think we will wait for Flink1.6 before trying to properly implement
state migrations in this case.

Regards,
Petter

On Fri, Apr 20, 2018 at 11:20 AM, Timo Walther <tw...@apache.org> wrote:

> Hi Petter,
>
> which state backend are you using in your case? I think there is no quick
> solution for your problem because a proper schema evolution story is on the
> roadmap for Flink 1.6.
>
> Would it work to change the serial version id of the generated Avro class
> as a temporary workaround?
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:21 schrieb Timo Walther:
>
> Thank you. Maybe we already identified the issue (see
> https://issues.apache.org/jira/browse/FLINK-9202). I will use your code
> to verify it.
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
>
> Hi Timo,
>
> Please find the generated class (for the second schema) attached.
>
> Regards,
> Petter
>
> On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <tw...@apache.org> wrote:
>
>> Hi Petter,
>>
>> could you share the source code of the class that Avro generates out of
>> this schema?
>>
>> Thank you.
>>
>> Regards,
>> Timo
>>
>> Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>>
>> Hello everyone,
>>
>> I am trying to figure out how to set up Flink with Avro for state
>> management (especially the content of snapshots) to enable state migrations
>> (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
>> tried to explicitly provide an instance of "new
>> AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple
>> Avro generated SpecificRecordBase of the following schema:
>>
>> {"namespace": "io.relayr.flink",
>>  "type": "record",
>>  "name": "Accumulator",
>>  "fields": [
>>      {"name": "accumulator", "type": "int"}
>>  ]
>> }
>>
>> This successfully saves the state to the snapshot. When I then try to
>> load the snapshot with an updated schema (adding the nullable field) it
>> fails. Schema looks like this:
>>
>> {"namespace": "io.relayr.flink",
>>  "type": "record",
>>  "name": "Accumulator",
>>  "fields": [
>>      {"name": "accumulator", "type": "int"},
>>      {"name": "newStuff", "type": ["int", "null"]}
>>  ]
>> }
>>
>> When I try to restart the Job from the snapshot, I get the following
>> exception:
>> 2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.ty
>> peutils.TypeSerializerSerializationUtil  - Deserialization of serializer
>> errored; replacing with null.
>> java.io.IOException: Unloadable class for type serializer.
>> ...
>> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
>> local class incompatible: stream classdesc serialVersionUID =
>> -3555733236161157838, local class serialVersionUID = 5291033088112484292
>>
>> Which is true, Avro tools do generate a new serialization ID for the
>> bean, I just didn't expect it to be used and expected the Avro schema to be
>> used instead? Did anyone get this working? What am I getting wrong?
>>
>> Best regards,
>> Petter
>>
>>
>>
>
>
>

Re: Managing state migrations with Flink and Avro

Posted by Timo Walther <tw...@apache.org>.
Hi Petter,

which state backend are you using in your case? I think there is no 
quick solution for your problem because a proper schema evolution story 
is on the roadmap for Flink 1.6.

Would it work to change the serial version id of the generated Avro 
class as a temporary workaround?

Regards,
Timo


Am 18.04.18 um 14:21 schrieb Timo Walther:
> Thank you. Maybe we already identified the issue (see 
> https://issues.apache.org/jira/browse/FLINK-9202). I will use your 
> code to verify it.
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
>> Hi Timo,
>>
>> Please find the generated class (for the second schema) attached.
>>
>> Regards,
>> Petter
>>
>> On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <twalthr@apache.org 
>> <ma...@apache.org>> wrote:
>>
>>     Hi Petter,
>>
>>     could you share the source code of the class that Avro generates
>>     out of this schema?
>>
>>     Thank you.
>>
>>     Regards,
>>     Timo
>>
>>     Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>>>     Hello everyone,
>>>
>>>     I am trying to figure out how to set up Flink with Avro for
>>>     state management (especially the content of snapshots) to enable
>>>     state migrations (e.g. adding a nullable fields to a class). So
>>>     far, in version 1.4.2, I tried to explicitly provide an instance
>>>     of "new AvroTypeInfo(Accumulator.getClass())" where accumulator
>>>     is a very simple Avro generated SpecificRecordBase of the
>>>     following schema:
>>>
>>>     {"namespace": "io.relayr.flink",
>>>      "type": "record",
>>>      "name": "Accumulator",
>>>      "fields": [
>>>          {"name": "accumulator", "type": "int"}
>>>      ]
>>>     }
>>>
>>>     This successfully saves the state to the snapshot. When I then
>>>     try to load the snapshot with an updated schema (adding the
>>>     nullable field) it fails. Schema looks like this:
>>>
>>>     {"namespace": "io.relayr.flink",
>>>      "type": "record",
>>>      "name": "Accumulator",
>>>      "fields": [
>>>          {"name": "accumulator", "type": "int"},
>>>          {"name": "newStuff", "type": ["int", "null"]}
>>>      ]
>>>     }
>>>
>>>     When I try to restart the Job from the snapshot, I get the
>>>     following exception:
>>>     2018-04-17 09:35:23,519 WARN
>>>     org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
>>>     - Deserialization of serializer errored; replacing with null.
>>>     java.io.IOException: Unloadable class for type serializer.
>>>     ...
>>>     Caused by: java.io.InvalidClassException:
>>>     io.relayr.flink.Accumulator; local class incompatible: stream
>>>     classdesc serialVersionUID = -3555733236161157838, local class
>>>     serialVersionUID = 5291033088112484292
>>>
>>>     Which is true, Avro tools do generate a new serialization ID for
>>>     the bean, I just didn't expect it to be used and expected the
>>>     Avro schema to be used instead? Did anyone get this working?
>>>     What am I getting wrong?
>>>
>>>     Best regards,
>>>     Petter
>>
>>
>>
>


Re: Managing state migrations with Flink and Avro

Posted by Timo Walther <tw...@apache.org>.
Thank you. Maybe we already identified the issue (see 
https://issues.apache.org/jira/browse/FLINK-9202). I will use your code 
to verify it.

Regards,
Timo


Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
> Hi Timo,
>
> Please find the generated class (for the second schema) attached.
>
> Regards,
> Petter
>
> On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
>
>     Hi Petter,
>
>     could you share the source code of the class that Avro generates
>     out of this schema?
>
>     Thank you.
>
>     Regards,
>     Timo
>
>     Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>>     Hello everyone,
>>
>>     I am trying to figure out how to set up Flink with Avro for state
>>     management (especially the content of snapshots) to enable state
>>     migrations (e.g. adding a nullable fields to a class). So far, in
>>     version 1.4.2, I tried to explicitly provide an instance of "new
>>     AvroTypeInfo(Accumulator.getClass())" where accumulator is a very
>>     simple Avro generated SpecificRecordBase of the following schema:
>>
>>     {"namespace": "io.relayr.flink",
>>      "type": "record",
>>      "name": "Accumulator",
>>      "fields": [
>>          {"name": "accumulator", "type": "int"}
>>      ]
>>     }
>>
>>     This successfully saves the state to the snapshot. When I then
>>     try to load the snapshot with an updated schema (adding the
>>     nullable field) it fails. Schema looks like this:
>>
>>     {"namespace": "io.relayr.flink",
>>      "type": "record",
>>      "name": "Accumulator",
>>      "fields": [
>>          {"name": "accumulator", "type": "int"},
>>          {"name": "newStuff", "type": ["int", "null"]}
>>      ]
>>     }
>>
>>     When I try to restart the Job from the snapshot, I get the
>>     following exception:
>>     2018-04-17 09:35:23,519 WARN
>>     org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
>>     - Deserialization of serializer errored; replacing with null.
>>     java.io.IOException: Unloadable class for type serializer.
>>     ...
>>     Caused by: java.io.InvalidClassException:
>>     io.relayr.flink.Accumulator; local class incompatible: stream
>>     classdesc serialVersionUID = -3555733236161157838, local class
>>     serialVersionUID = 5291033088112484292
>>
>>     Which is true, Avro tools do generate a new serialization ID for
>>     the bean, I just didn't expect it to be used and expected the
>>     Avro schema to be used instead? Did anyone get this working? What
>>     am I getting wrong?
>>
>>     Best regards,
>>     Petter
>
>
>


Re: Managing state migrations with Flink and Avro

Posted by Petter Arvidsson <pe...@relayr.io>.
Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <tw...@apache.org> wrote:

> Hi Petter,
>
> could you share the source code of the class that Avro generates out of
> this schema?
>
> Thank you.
>
> Regards,
> Timo
>
> Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>
> Hello everyone,
>
> I am trying to figure out how to set up Flink with Avro for state
> management (especially the content of snapshots) to enable state migrations
> (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
> tried to explicitly provide an instance of "new AvroTypeInfo(Accumulator.getClass())"
> where accumulator is a very simple Avro generated SpecificRecordBase of the
> following schema:
>
> {"namespace": "io.relayr.flink",
>  "type": "record",
>  "name": "Accumulator",
>  "fields": [
>      {"name": "accumulator", "type": "int"}
>  ]
> }
>
> This successfully saves the state to the snapshot. When I then try to load
> the snapshot with an updated schema (adding the nullable field) it fails.
> Schema looks like this:
>
> {"namespace": "io.relayr.flink",
>  "type": "record",
>  "name": "Accumulator",
>  "fields": [
>      {"name": "accumulator", "type": "int"},
>      {"name": "newStuff", "type": ["int", "null"]}
>  ]
> }
>
> When I try to restart the Job from the snapshot, I get the following
> exception:
> 2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.ty
> peutils.TypeSerializerSerializationUtil  - Deserialization of serializer
> errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> ...
> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
> local class incompatible: stream classdesc serialVersionUID =
> -3555733236161157838, local class serialVersionUID = 5291033088112484292
>
> Which is true, Avro tools do generate a new serialization ID for the bean,
> I just didn't expect it to be used and expected the Avro schema to be used
> instead? Did anyone get this working? What am I getting wrong?
>
> Best regards,
> Petter
>
>
>

Re: Managing state migrations with Flink and Avro

Posted by Timo Walther <tw...@apache.org>.
Hi Petter,

could you share the source code of the class that Avro generates out of 
this schema?

Thank you.

Regards,
Timo

Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
> Hello everyone,
>
> I am trying to figure out how to set up Flink with Avro for state 
> management (especially the content of snapshots) to enable state 
> migrations (e.g. adding a nullable fields to a class). So far, in 
> version 1.4.2, I tried to explicitly provide an instance of "new 
> AvroTypeInfo(Accumulator.getClass())" where accumulator is a very 
> simple Avro generated SpecificRecordBase of the following schema:
>
> {"namespace": "io.relayr.flink",
>  "type": "record",
>  "name": "Accumulator",
>  "fields": [
>      {"name": "accumulator", "type": "int"}
>  ]
> }
>
> This successfully saves the state to the snapshot. When I then try to 
> load the snapshot with an updated schema (adding the nullable field) 
> it fails. Schema looks like this:
>
> {"namespace": "io.relayr.flink",
>  "type": "record",
>  "name": "Accumulator",
>  "fields": [
>      {"name": "accumulator", "type": "int"},
>      {"name": "newStuff", "type": ["int", "null"]}
>  ]
> }
>
> When I try to restart the Job from the snapshot, I get the following 
> exception:
> 2018-04-17 09:35:23,519 WARN  
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil 
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> ...
> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator; 
> local class incompatible: stream classdesc serialVersionUID = 
> -3555733236161157838, local class serialVersionUID = 5291033088112484292
>
> Which is true, Avro tools do generate a new serialization ID for the 
> bean, I just didn't expect it to be used and expected the Avro schema 
> to be used instead? Did anyone get this working? What am I getting wrong?
>
> Best regards,
> Petter