You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Alexey Romanenko <ar...@gmail.com> on 2022/11/23 17:15:19 UTC

Re: Beam, Flink state and Avro Schema Evolution is problematic

+ dev

Many thanks for sharing your observations and findings on this topic, Cristian!
I copy it to dev@ as well to attract more attention to this problem.

—
Alexey


> On 18 Nov 2022, at 18:21, Cristian Constantinescu <ze...@gmail.com> wrote:
> 
> Hi everyone,
> 
> I'm using Beam on Flink with Avro generated records. If the record
> schema changes, the Flink state cannot be restored. I just want to
> send this email out for anyone who may need this info in the future
> and also ask others for possible solutions as this problem is so
> easily hit, that I'm having a hard time figuring out what other users
> of Beam running on the Flink runner are doing to circumvent it.
> 
> The in-depth discussion of the issue can be found here [1] (thanks
> Maximilian). There are also a few more emails about this here [2], and
> here [3].
> 
> The gist of the issue is that Beam serializes the coders used into the
> Flink state, and some of those coders hold references to the
> Bean/Pojos/Java classes they serialize/deserialize to. Flink
> serializes its state using Java serialization, that means that in the
> Flink state we will get a reference to the Bean/Pojo/Java class name
> and the related serialVersionUID. When the pojo (Avro generated)
> changes, so does its serialVersionUID, and Flink cannot deserialize
> the Beam state anymore because the serialVersionUID doesn't match, not
> on the Coder, but on the Pojo type that coder was holding when it got
> serialized.
> 
> I decided to try each coder capable of handling Pojos, one by one, to
> see if any would work. That is, I tried the SerializableCoder,
> AvroCoder and the SchemaCoder/RowCoder. In the case of AvroCoder and
> SerializableCoder, I have used the SpecificRecord version (not the
> GenericRecord one) and the non-Row (ie: the one that returns a Pojo
> type, not Row type) version respectively. They all failed the below
> test (added it to be very explicit, but really, it's just simple
> schema evolution).
> 
> Test:
> 1. Create a avro pojo (idl generated pojo):
> record FooRecord {
> union {null, string} dummy1 = null;
> }
> 2. Create a pipeline with a simple stateful DoFn, set desired coder
> for FooRecord (I tried the SerializableCoder, AvroCoder and the
> SchemaCoder/RowCoder), and populate state with a few FooRecord
> objects.
> 3. Start the pipeline
> 4. Stop the pipeline with a savepoint.
> 5. Augment FooRecord to add another field after dummy1.
> 6. Start the pipeline restoring from the saved savepoint.
> 7. Observed this exception when deserializing the savepoint -->
> "Caused by: java.io.InvalidClassException: com.mymodels.FooRecord;
> local class incompatible: stream classdesc serialVersionUID = <some
> number>, local class serialVersionUID = <some other number>"
> 
> There are a few workarounds.
> 
> Workaround A:
> Right now my working solution is to implement what was suggested by
> Pavel (thanks Pavel) in [3]. Quote from him "having my business
> logic-related POJOs still Avro-generated, but I introduced another,
> generic one, which just stores schema & payload bytes, and does not
> need to change. then using a DelegateCoder that converts the POJO
> to/from that generic schema-bytes pojo that never changes".
> 
> Basically something like this (pseudocode):
> record FlinkStateValue {
> string schema;
> bytes value;
> }
> 
> var delegateCoder = DelegateCoder.of(
> AvroCoder.of(FlinkStateValue.class),
> (FooRecord in) ->
> FlinkStateValue.setSchema(FooRecord.getSchema()).setValue(AvroCoder.of(FooRecord.class).encode(in)),
> (FlinkStateValue in) -> return
> AvroCoder.of(FooRecord.class).decode(in.getValue())
> ) ;
> 
> p.getCoderRegistry().registerCoderForClass(FooRecord.class, delegateCoder)
> 
> The downside is that now there's yet another deserialization step,
> which wastes CPU cycles. The upside is that things are decoupled, that
> is, I think the DelegateCoder could use a RowCoder.of(FooRecord)
> instead of the AvroCoder.of(FooRecord), or any other coder for that
> matter and you can change between them with only a code change.
> 
> Workaround B:
> Difficulty hard! Use the Flink state api [4] and update the Beam
> serialized state to modify the FooRecord serialVersionUID stored in
> that state to the new one after the schema evolution, then save the
> state and start your pipeline with the evolved FooRecord.
> 
> Workaround C:
> Wrap the Avro generated FooRecord to a real Pojo or AutoValue or
> anything that you have full control over serialVersionUID, and use
> that in your pipeline especially when putting things into the state.
> 
> Problem arises when the Avro generated records have lots of properties
> and or nested records. It becomes tedious to essentially duplicate
> them to Pojo/AutoValue.
> 
> Conclusion:
> I want to end by asking advice from the community. For those of you
> who use Beam with Avro records running on the Flink runner, how do you
> handle state when the Avro schema inevitably evolves?
> 
> It just seems like it's such a simple use case and such an easy
> pittrap to fall into, that I'm unsure why there's only 3 people (4
> including me) who asked for advice for this issue. Are the 4 of us
> doing something wrong?
> 
> Thanks in advance for your advice,
> Cristian
> 
> [1] https://www.mail-archive.com/user@beam.apache.org/msg05648.html
> [2] https://www.mail-archive.com/user@beam.apache.org/msg07169.html
> [3] https://lists.apache.org/thread/rlnljx684pvg3fvfv3nxvbdbnxg19nns
> [4] https://flink.apache.org/feature/2019/09/13/state-processor-api.html