You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Cristian Constantinescu <ze...@gmail.com> on 2022/11/18 17:21:00 UTC

Beam, Flink state and Avro Schema Evolution is problematic

Hi everyone,

I'm using Beam on Flink with Avro generated records. If the record
schema changes, the Flink state cannot be restored. I just want to
send this email out for anyone who may need this info in the future
and also ask others for possible solutions as this problem is so
easily hit, that I'm having a hard time figuring out what other users
of Beam running on the Flink runner are doing to circumvent it.

The in-depth discussion of the issue can be found here [1] (thanks
Maximilian). There are also a few more emails about this here [2], and
here [3].

The gist of the issue is that Beam serializes the coders used into the
Flink state, and some of those coders hold references to the
Bean/Pojos/Java classes they serialize/deserialize to. Flink
serializes its state using Java serialization, that means that in the
Flink state we will get a reference to the Bean/Pojo/Java class name
and the related serialVersionUID. When the pojo (Avro generated)
changes, so does its serialVersionUID, and Flink cannot deserialize
the Beam state anymore because the serialVersionUID doesn't match, not
on the Coder, but on the Pojo type that coder was holding when it got
serialized.

I decided to try each coder capable of handling Pojos, one by one, to
see if any would work. That is, I tried the SerializableCoder,
AvroCoder and the SchemaCoder/RowCoder. In the case of AvroCoder and
SerializableCoder, I have used the SpecificRecord version (not the
GenericRecord one) and the non-Row (ie: the one that returns a Pojo
type, not Row type) version respectively. They all failed the below
test (added it to be very explicit, but really, it's just simple
schema evolution).

Test:
1. Create a avro pojo (idl generated pojo):
record FooRecord {
union {null, string} dummy1 = null;
}
2. Create a pipeline with a simple stateful DoFn, set desired coder
for FooRecord (I tried the SerializableCoder, AvroCoder and the
SchemaCoder/RowCoder), and populate state with a few FooRecord
objects.
3. Start the pipeline
4. Stop the pipeline with a savepoint.
5. Augment FooRecord to add another field after dummy1.
6. Start the pipeline restoring from the saved savepoint.
7. Observed this exception when deserializing the savepoint -->
"Caused by: java.io.InvalidClassException: com.mymodels.FooRecord;
local class incompatible: stream classdesc serialVersionUID = <some
number>, local class serialVersionUID = <some other number>"

There are a few workarounds.

Workaround A:
Right now my working solution is to implement what was suggested by
Pavel (thanks Pavel) in [3]. Quote from him "having my business
logic-related POJOs still Avro-generated, but I introduced another,
generic one, which just stores schema & payload bytes, and does not
need to change. then using a DelegateCoder that converts the POJO
to/from that generic schema-bytes pojo that never changes".

Basically something like this (pseudocode):
record FlinkStateValue {
string schema;
bytes value;
}

var delegateCoder = DelegateCoder.of(
AvroCoder.of(FlinkStateValue.class),
(FooRecord in) ->
FlinkStateValue.setSchema(FooRecord.getSchema()).setValue(AvroCoder.of(FooRecord.class).encode(in)),
(FlinkStateValue in) -> return
AvroCoder.of(FooRecord.class).decode(in.getValue())
) ;

p.getCoderRegistry().registerCoderForClass(FooRecord.class, delegateCoder)

The downside is that now there's yet another deserialization step,
which wastes CPU cycles. The upside is that things are decoupled, that
is, I think the DelegateCoder could use a RowCoder.of(FooRecord)
instead of the AvroCoder.of(FooRecord), or any other coder for that
matter and you can change between them with only a code change.

Workaround B:
Difficulty hard! Use the Flink state api [4] and update the Beam
serialized state to modify the FooRecord serialVersionUID stored in
that state to the new one after the schema evolution, then save the
state and start your pipeline with the evolved FooRecord.

Workaround C:
Wrap the Avro generated FooRecord to a real Pojo or AutoValue or
anything that you have full control over serialVersionUID, and use
that in your pipeline especially when putting things into the state.

Problem arises when the Avro generated records have lots of properties
and or nested records. It becomes tedious to essentially duplicate
them to Pojo/AutoValue.

Conclusion:
I want to end by asking advice from the community. For those of you
who use Beam with Avro records running on the Flink runner, how do you
handle state when the Avro schema inevitably evolves?

It just seems like it's such a simple use case and such an easy
pittrap to fall into, that I'm unsure why there's only 3 people (4
including me) who asked for advice for this issue. Are the 4 of us
doing something wrong?

Thanks in advance for your advice,
Cristian

[1] https://www.mail-archive.com/user@beam.apache.org/msg05648.html
[2] https://www.mail-archive.com/user@beam.apache.org/msg07169.html
[3] https://lists.apache.org/thread/rlnljx684pvg3fvfv3nxvbdbnxg19nns
[4] https://flink.apache.org/feature/2019/09/13/state-processor-api.html

Re: Beam, Flink state and Avro Schema Evolution is problematic

Posted by Alexey Romanenko <ar...@gmail.com>.
+ dev

Many thanks for sharing your observations and findings on this topic, Cristian!
I copy it to dev@ as well to attract more attention to this problem.

—
Alexey


> On 18 Nov 2022, at 18:21, Cristian Constantinescu <ze...@gmail.com> wrote:
> 
> Hi everyone,
> 
> I'm using Beam on Flink with Avro generated records. If the record
> schema changes, the Flink state cannot be restored. I just want to
> send this email out for anyone who may need this info in the future
> and also ask others for possible solutions as this problem is so
> easily hit, that I'm having a hard time figuring out what other users
> of Beam running on the Flink runner are doing to circumvent it.
> 
> The in-depth discussion of the issue can be found here [1] (thanks
> Maximilian). There are also a few more emails about this here [2], and
> here [3].
> 
> The gist of the issue is that Beam serializes the coders used into the
> Flink state, and some of those coders hold references to the
> Bean/Pojos/Java classes they serialize/deserialize to. Flink
> serializes its state using Java serialization, that means that in the
> Flink state we will get a reference to the Bean/Pojo/Java class name
> and the related serialVersionUID. When the pojo (Avro generated)
> changes, so does its serialVersionUID, and Flink cannot deserialize
> the Beam state anymore because the serialVersionUID doesn't match, not
> on the Coder, but on the Pojo type that coder was holding when it got
> serialized.
> 
> I decided to try each coder capable of handling Pojos, one by one, to
> see if any would work. That is, I tried the SerializableCoder,
> AvroCoder and the SchemaCoder/RowCoder. In the case of AvroCoder and
> SerializableCoder, I have used the SpecificRecord version (not the
> GenericRecord one) and the non-Row (ie: the one that returns a Pojo
> type, not Row type) version respectively. They all failed the below
> test (added it to be very explicit, but really, it's just simple
> schema evolution).
> 
> Test:
> 1. Create a avro pojo (idl generated pojo):
> record FooRecord {
> union {null, string} dummy1 = null;
> }
> 2. Create a pipeline with a simple stateful DoFn, set desired coder
> for FooRecord (I tried the SerializableCoder, AvroCoder and the
> SchemaCoder/RowCoder), and populate state with a few FooRecord
> objects.
> 3. Start the pipeline
> 4. Stop the pipeline with a savepoint.
> 5. Augment FooRecord to add another field after dummy1.
> 6. Start the pipeline restoring from the saved savepoint.
> 7. Observed this exception when deserializing the savepoint -->
> "Caused by: java.io.InvalidClassException: com.mymodels.FooRecord;
> local class incompatible: stream classdesc serialVersionUID = <some
> number>, local class serialVersionUID = <some other number>"
> 
> There are a few workarounds.
> 
> Workaround A:
> Right now my working solution is to implement what was suggested by
> Pavel (thanks Pavel) in [3]. Quote from him "having my business
> logic-related POJOs still Avro-generated, but I introduced another,
> generic one, which just stores schema & payload bytes, and does not
> need to change. then using a DelegateCoder that converts the POJO
> to/from that generic schema-bytes pojo that never changes".
> 
> Basically something like this (pseudocode):
> record FlinkStateValue {
> string schema;
> bytes value;
> }
> 
> var delegateCoder = DelegateCoder.of(
> AvroCoder.of(FlinkStateValue.class),
> (FooRecord in) ->
> FlinkStateValue.setSchema(FooRecord.getSchema()).setValue(AvroCoder.of(FooRecord.class).encode(in)),
> (FlinkStateValue in) -> return
> AvroCoder.of(FooRecord.class).decode(in.getValue())
> ) ;
> 
> p.getCoderRegistry().registerCoderForClass(FooRecord.class, delegateCoder)
> 
> The downside is that now there's yet another deserialization step,
> which wastes CPU cycles. The upside is that things are decoupled, that
> is, I think the DelegateCoder could use a RowCoder.of(FooRecord)
> instead of the AvroCoder.of(FooRecord), or any other coder for that
> matter and you can change between them with only a code change.
> 
> Workaround B:
> Difficulty hard! Use the Flink state api [4] and update the Beam
> serialized state to modify the FooRecord serialVersionUID stored in
> that state to the new one after the schema evolution, then save the
> state and start your pipeline with the evolved FooRecord.
> 
> Workaround C:
> Wrap the Avro generated FooRecord to a real Pojo or AutoValue or
> anything that you have full control over serialVersionUID, and use
> that in your pipeline especially when putting things into the state.
> 
> Problem arises when the Avro generated records have lots of properties
> and or nested records. It becomes tedious to essentially duplicate
> them to Pojo/AutoValue.
> 
> Conclusion:
> I want to end by asking advice from the community. For those of you
> who use Beam with Avro records running on the Flink runner, how do you
> handle state when the Avro schema inevitably evolves?
> 
> It just seems like it's such a simple use case and such an easy
> pittrap to fall into, that I'm unsure why there's only 3 people (4
> including me) who asked for advice for this issue. Are the 4 of us
> doing something wrong?
> 
> Thanks in advance for your advice,
> Cristian
> 
> [1] https://www.mail-archive.com/user@beam.apache.org/msg05648.html
> [2] https://www.mail-archive.com/user@beam.apache.org/msg07169.html
> [3] https://lists.apache.org/thread/rlnljx684pvg3fvfv3nxvbdbnxg19nns
> [4] https://flink.apache.org/feature/2019/09/13/state-processor-api.html


Re: Beam, Flink state and Avro Schema Evolution is problematic

Posted by Alexey Romanenko <ar...@gmail.com>.
+ dev

Many thanks for sharing your observations and findings on this topic, Cristian!
I copy it to dev@ as well to attract more attention to this problem.

—
Alexey


> On 18 Nov 2022, at 18:21, Cristian Constantinescu <ze...@gmail.com> wrote:
> 
> Hi everyone,
> 
> I'm using Beam on Flink with Avro generated records. If the record
> schema changes, the Flink state cannot be restored. I just want to
> send this email out for anyone who may need this info in the future
> and also ask others for possible solutions as this problem is so
> easily hit, that I'm having a hard time figuring out what other users
> of Beam running on the Flink runner are doing to circumvent it.
> 
> The in-depth discussion of the issue can be found here [1] (thanks
> Maximilian). There are also a few more emails about this here [2], and
> here [3].
> 
> The gist of the issue is that Beam serializes the coders used into the
> Flink state, and some of those coders hold references to the
> Bean/Pojos/Java classes they serialize/deserialize to. Flink
> serializes its state using Java serialization, that means that in the
> Flink state we will get a reference to the Bean/Pojo/Java class name
> and the related serialVersionUID. When the pojo (Avro generated)
> changes, so does its serialVersionUID, and Flink cannot deserialize
> the Beam state anymore because the serialVersionUID doesn't match, not
> on the Coder, but on the Pojo type that coder was holding when it got
> serialized.
> 
> I decided to try each coder capable of handling Pojos, one by one, to
> see if any would work. That is, I tried the SerializableCoder,
> AvroCoder and the SchemaCoder/RowCoder. In the case of AvroCoder and
> SerializableCoder, I have used the SpecificRecord version (not the
> GenericRecord one) and the non-Row (ie: the one that returns a Pojo
> type, not Row type) version respectively. They all failed the below
> test (added it to be very explicit, but really, it's just simple
> schema evolution).
> 
> Test:
> 1. Create a avro pojo (idl generated pojo):
> record FooRecord {
> union {null, string} dummy1 = null;
> }
> 2. Create a pipeline with a simple stateful DoFn, set desired coder
> for FooRecord (I tried the SerializableCoder, AvroCoder and the
> SchemaCoder/RowCoder), and populate state with a few FooRecord
> objects.
> 3. Start the pipeline
> 4. Stop the pipeline with a savepoint.
> 5. Augment FooRecord to add another field after dummy1.
> 6. Start the pipeline restoring from the saved savepoint.
> 7. Observed this exception when deserializing the savepoint -->
> "Caused by: java.io.InvalidClassException: com.mymodels.FooRecord;
> local class incompatible: stream classdesc serialVersionUID = <some
> number>, local class serialVersionUID = <some other number>"
> 
> There are a few workarounds.
> 
> Workaround A:
> Right now my working solution is to implement what was suggested by
> Pavel (thanks Pavel) in [3]. Quote from him "having my business
> logic-related POJOs still Avro-generated, but I introduced another,
> generic one, which just stores schema & payload bytes, and does not
> need to change. then using a DelegateCoder that converts the POJO
> to/from that generic schema-bytes pojo that never changes".
> 
> Basically something like this (pseudocode):
> record FlinkStateValue {
> string schema;
> bytes value;
> }
> 
> var delegateCoder = DelegateCoder.of(
> AvroCoder.of(FlinkStateValue.class),
> (FooRecord in) ->
> FlinkStateValue.setSchema(FooRecord.getSchema()).setValue(AvroCoder.of(FooRecord.class).encode(in)),
> (FlinkStateValue in) -> return
> AvroCoder.of(FooRecord.class).decode(in.getValue())
> ) ;
> 
> p.getCoderRegistry().registerCoderForClass(FooRecord.class, delegateCoder)
> 
> The downside is that now there's yet another deserialization step,
> which wastes CPU cycles. The upside is that things are decoupled, that
> is, I think the DelegateCoder could use a RowCoder.of(FooRecord)
> instead of the AvroCoder.of(FooRecord), or any other coder for that
> matter and you can change between them with only a code change.
> 
> Workaround B:
> Difficulty hard! Use the Flink state api [4] and update the Beam
> serialized state to modify the FooRecord serialVersionUID stored in
> that state to the new one after the schema evolution, then save the
> state and start your pipeline with the evolved FooRecord.
> 
> Workaround C:
> Wrap the Avro generated FooRecord to a real Pojo or AutoValue or
> anything that you have full control over serialVersionUID, and use
> that in your pipeline especially when putting things into the state.
> 
> Problem arises when the Avro generated records have lots of properties
> and or nested records. It becomes tedious to essentially duplicate
> them to Pojo/AutoValue.
> 
> Conclusion:
> I want to end by asking advice from the community. For those of you
> who use Beam with Avro records running on the Flink runner, how do you
> handle state when the Avro schema inevitably evolves?
> 
> It just seems like it's such a simple use case and such an easy
> pittrap to fall into, that I'm unsure why there's only 3 people (4
> including me) who asked for advice for this issue. Are the 4 of us
> doing something wrong?
> 
> Thanks in advance for your advice,
> Cristian
> 
> [1] https://www.mail-archive.com/user@beam.apache.org/msg05648.html
> [2] https://www.mail-archive.com/user@beam.apache.org/msg07169.html
> [3] https://lists.apache.org/thread/rlnljx684pvg3fvfv3nxvbdbnxg19nns
> [4] https://flink.apache.org/feature/2019/09/13/state-processor-api.html