You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Petter Arvidsson <pe...@relayr.io> on 2018/04/18 09:00:41 UTC
Managing state migrations with Flink and Avro
Hello everyone,
I am trying to figure out how to set up Flink with Avro for state
management (especially the content of snapshots) to enable state migrations
(e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
tried to explicitly provide an instance of "new
AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple
Avro generated SpecificRecordBase of the following schema:
{"namespace": "io.relayr.flink",
"type": "record",
"name": "Accumulator",
"fields": [
{"name": "accumulator", "type": "int"}
]
}
This successfully saves the state to the snapshot. When I then try to load
the snapshot with an updated schema (adding the nullable field) it fails.
Schema looks like this:
{"namespace": "io.relayr.flink",
"type": "record",
"name": "Accumulator",
"fields": [
{"name": "accumulator", "type": "int"},
{"name": "newStuff", "type": ["int", "null"]}
]
}
When I try to restart the Job from the snapshot, I get the following
exception:
2018-04-17 09:35:23,519 WARN org.apache.flink.api.common.typeutils.
TypeSerializerSerializationUtil - Deserialization of serializer errored;
replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
local class incompatible: stream classdesc serialVersionUID =
-3555733236161157838, local class serialVersionUID = 5291033088112484292
Which is true, Avro tools do generate a new serialization ID for the bean,
I just didn't expect it to be used and expected the Avro schema to be used
instead? Did anyone get this working? What am I getting wrong?
Best regards,
Petter
Re: Managing state migrations with Flink and Avro
Posted by Petter Arvidsson <pe...@relayr.io>.
Hi Timo,
Thanks for your response. We are using the filesystem backend backed by S3.
We were looking for a good long term solution with Avro, so manually
changing the serial version id is probably not the right way to proceed for
us. I think we will wait for Flink1.6 before trying to properly implement
state migrations in this case.
Regards,
Petter
On Fri, Apr 20, 2018 at 11:20 AM, Timo Walther <tw...@apache.org> wrote:
> Hi Petter,
>
> which state backend are you using in your case? I think there is no quick
> solution for your problem because a proper schema evolution story is on the
> roadmap for Flink 1.6.
>
> Would it work to change the serial version id of the generated Avro class
> as a temporary workaround?
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:21 schrieb Timo Walther:
>
> Thank you. Maybe we already identified the issue (see
> https://issues.apache.org/jira/browse/FLINK-9202). I will use your code
> to verify it.
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
>
> Hi Timo,
>
> Please find the generated class (for the second schema) attached.
>
> Regards,
> Petter
>
> On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <tw...@apache.org> wrote:
>
>> Hi Petter,
>>
>> could you share the source code of the class that Avro generates out of
>> this schema?
>>
>> Thank you.
>>
>> Regards,
>> Timo
>>
>> Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>>
>> Hello everyone,
>>
>> I am trying to figure out how to set up Flink with Avro for state
>> management (especially the content of snapshots) to enable state migrations
>> (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
>> tried to explicitly provide an instance of "new
>> AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple
>> Avro generated SpecificRecordBase of the following schema:
>>
>> {"namespace": "io.relayr.flink",
>> "type": "record",
>> "name": "Accumulator",
>> "fields": [
>> {"name": "accumulator", "type": "int"}
>> ]
>> }
>>
>> This successfully saves the state to the snapshot. When I then try to
>> load the snapshot with an updated schema (adding the nullable field) it
>> fails. Schema looks like this:
>>
>> {"namespace": "io.relayr.flink",
>> "type": "record",
>> "name": "Accumulator",
>> "fields": [
>> {"name": "accumulator", "type": "int"},
>> {"name": "newStuff", "type": ["int", "null"]}
>> ]
>> }
>>
>> When I try to restart the Job from the snapshot, I get the following
>> exception:
>> 2018-04-17 09:35:23,519 WARN org.apache.flink.api.common.ty
>> peutils.TypeSerializerSerializationUtil - Deserialization of serializer
>> errored; replacing with null.
>> java.io.IOException: Unloadable class for type serializer.
>> ...
>> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
>> local class incompatible: stream classdesc serialVersionUID =
>> -3555733236161157838, local class serialVersionUID = 5291033088112484292
>>
>> Which is true, Avro tools do generate a new serialization ID for the
>> bean, I just didn't expect it to be used and expected the Avro schema to be
>> used instead? Did anyone get this working? What am I getting wrong?
>>
>> Best regards,
>> Petter
>>
>>
>>
>
>
>
Re: Managing state migrations with Flink and Avro
Posted by Timo Walther <tw...@apache.org>.
Hi Petter,
which state backend are you using in your case? I think there is no
quick solution for your problem because a proper schema evolution story
is on the roadmap for Flink 1.6.
Would it work to change the serial version id of the generated Avro
class as a temporary workaround?
Regards,
Timo
Am 18.04.18 um 14:21 schrieb Timo Walther:
> Thank you. Maybe we already identified the issue (see
> https://issues.apache.org/jira/browse/FLINK-9202). I will use your
> code to verify it.
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
>> Hi Timo,
>>
>> Please find the generated class (for the second schema) attached.
>>
>> Regards,
>> Petter
>>
>> On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <twalthr@apache.org
>> <ma...@apache.org>> wrote:
>>
>> Hi Petter,
>>
>> could you share the source code of the class that Avro generates
>> out of this schema?
>>
>> Thank you.
>>
>> Regards,
>> Timo
>>
>> Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>>> Hello everyone,
>>>
>>> I am trying to figure out how to set up Flink with Avro for
>>> state management (especially the content of snapshots) to enable
>>> state migrations (e.g. adding a nullable fields to a class). So
>>> far, in version 1.4.2, I tried to explicitly provide an instance
>>> of "new AvroTypeInfo(Accumulator.getClass())" where accumulator
>>> is a very simple Avro generated SpecificRecordBase of the
>>> following schema:
>>>
>>> {"namespace": "io.relayr.flink",
>>> "type": "record",
>>> "name": "Accumulator",
>>> "fields": [
>>> {"name": "accumulator", "type": "int"}
>>> ]
>>> }
>>>
>>> This successfully saves the state to the snapshot. When I then
>>> try to load the snapshot with an updated schema (adding the
>>> nullable field) it fails. Schema looks like this:
>>>
>>> {"namespace": "io.relayr.flink",
>>> "type": "record",
>>> "name": "Accumulator",
>>> "fields": [
>>> {"name": "accumulator", "type": "int"},
>>> {"name": "newStuff", "type": ["int", "null"]}
>>> ]
>>> }
>>>
>>> When I try to restart the Job from the snapshot, I get the
>>> following exception:
>>> 2018-04-17 09:35:23,519 WARN
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
>>> - Deserialization of serializer errored; replacing with null.
>>> java.io.IOException: Unloadable class for type serializer.
>>> ...
>>> Caused by: java.io.InvalidClassException:
>>> io.relayr.flink.Accumulator; local class incompatible: stream
>>> classdesc serialVersionUID = -3555733236161157838, local class
>>> serialVersionUID = 5291033088112484292
>>>
>>> Which is true, Avro tools do generate a new serialization ID for
>>> the bean, I just didn't expect it to be used and expected the
>>> Avro schema to be used instead? Did anyone get this working?
>>> What am I getting wrong?
>>>
>>> Best regards,
>>> Petter
>>
>>
>>
>
Re: Managing state migrations with Flink and Avro
Posted by Timo Walther <tw...@apache.org>.
Thank you. Maybe we already identified the issue (see
https://issues.apache.org/jira/browse/FLINK-9202). I will use your code
to verify it.
Regards,
Timo
Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
> Hi Timo,
>
> Please find the generated class (for the second schema) attached.
>
> Regards,
> Petter
>
> On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <twalthr@apache.org
> <ma...@apache.org>> wrote:
>
> Hi Petter,
>
> could you share the source code of the class that Avro generates
> out of this schema?
>
> Thank you.
>
> Regards,
> Timo
>
> Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>> Hello everyone,
>>
>> I am trying to figure out how to set up Flink with Avro for state
>> management (especially the content of snapshots) to enable state
>> migrations (e.g. adding a nullable fields to a class). So far, in
>> version 1.4.2, I tried to explicitly provide an instance of "new
>> AvroTypeInfo(Accumulator.getClass())" where accumulator is a very
>> simple Avro generated SpecificRecordBase of the following schema:
>>
>> {"namespace": "io.relayr.flink",
>> "type": "record",
>> "name": "Accumulator",
>> "fields": [
>> {"name": "accumulator", "type": "int"}
>> ]
>> }
>>
>> This successfully saves the state to the snapshot. When I then
>> try to load the snapshot with an updated schema (adding the
>> nullable field) it fails. Schema looks like this:
>>
>> {"namespace": "io.relayr.flink",
>> "type": "record",
>> "name": "Accumulator",
>> "fields": [
>> {"name": "accumulator", "type": "int"},
>> {"name": "newStuff", "type": ["int", "null"]}
>> ]
>> }
>>
>> When I try to restart the Job from the snapshot, I get the
>> following exception:
>> 2018-04-17 09:35:23,519 WARN
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
>> - Deserialization of serializer errored; replacing with null.
>> java.io.IOException: Unloadable class for type serializer.
>> ...
>> Caused by: java.io.InvalidClassException:
>> io.relayr.flink.Accumulator; local class incompatible: stream
>> classdesc serialVersionUID = -3555733236161157838, local class
>> serialVersionUID = 5291033088112484292
>>
>> Which is true, Avro tools do generate a new serialization ID for
>> the bean, I just didn't expect it to be used and expected the
>> Avro schema to be used instead? Did anyone get this working? What
>> am I getting wrong?
>>
>> Best regards,
>> Petter
>
>
>
Re: Managing state migrations with Flink and Avro
Posted by Petter Arvidsson <pe...@relayr.io>.
Hi Timo,
Please find the generated class (for the second schema) attached.
Regards,
Petter
On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <tw...@apache.org> wrote:
> Hi Petter,
>
> could you share the source code of the class that Avro generates out of
> this schema?
>
> Thank you.
>
> Regards,
> Timo
>
> Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>
> Hello everyone,
>
> I am trying to figure out how to set up Flink with Avro for state
> management (especially the content of snapshots) to enable state migrations
> (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
> tried to explicitly provide an instance of "new AvroTypeInfo(Accumulator.getClass())"
> where accumulator is a very simple Avro generated SpecificRecordBase of the
> following schema:
>
> {"namespace": "io.relayr.flink",
> "type": "record",
> "name": "Accumulator",
> "fields": [
> {"name": "accumulator", "type": "int"}
> ]
> }
>
> This successfully saves the state to the snapshot. When I then try to load
> the snapshot with an updated schema (adding the nullable field) it fails.
> Schema looks like this:
>
> {"namespace": "io.relayr.flink",
> "type": "record",
> "name": "Accumulator",
> "fields": [
> {"name": "accumulator", "type": "int"},
> {"name": "newStuff", "type": ["int", "null"]}
> ]
> }
>
> When I try to restart the Job from the snapshot, I get the following
> exception:
> 2018-04-17 09:35:23,519 WARN org.apache.flink.api.common.ty
> peutils.TypeSerializerSerializationUtil - Deserialization of serializer
> errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> ...
> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
> local class incompatible: stream classdesc serialVersionUID =
> -3555733236161157838, local class serialVersionUID = 5291033088112484292
>
> Which is true, Avro tools do generate a new serialization ID for the bean,
> I just didn't expect it to be used and expected the Avro schema to be used
> instead? Did anyone get this working? What am I getting wrong?
>
> Best regards,
> Petter
>
>
>
Re: Managing state migrations with Flink and Avro
Posted by Timo Walther <tw...@apache.org>.
Hi Petter,
could you share the source code of the class that Avro generates out of
this schema?
Thank you.
Regards,
Timo
Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
> Hello everyone,
>
> I am trying to figure out how to set up Flink with Avro for state
> management (especially the content of snapshots) to enable state
> migrations (e.g. adding a nullable fields to a class). So far, in
> version 1.4.2, I tried to explicitly provide an instance of "new
> AvroTypeInfo(Accumulator.getClass())" where accumulator is a very
> simple Avro generated SpecificRecordBase of the following schema:
>
> {"namespace": "io.relayr.flink",
> "type": "record",
> "name": "Accumulator",
> "fields": [
> {"name": "accumulator", "type": "int"}
> ]
> }
>
> This successfully saves the state to the snapshot. When I then try to
> load the snapshot with an updated schema (adding the nullable field)
> it fails. Schema looks like this:
>
> {"namespace": "io.relayr.flink",
> "type": "record",
> "name": "Accumulator",
> "fields": [
> {"name": "accumulator", "type": "int"},
> {"name": "newStuff", "type": ["int", "null"]}
> ]
> }
>
> When I try to restart the Job from the snapshot, I get the following
> exception:
> 2018-04-17 09:35:23,519 WARN
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> ...
> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
> local class incompatible: stream classdesc serialVersionUID =
> -3555733236161157838, local class serialVersionUID = 5291033088112484292
>
> Which is true, Avro tools do generate a new serialization ID for the
> bean, I just didn't expect it to be used and expected the Avro schema
> to be used instead? Did anyone get this working? What am I getting wrong?
>
> Best regards,
> Petter