You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Biplob Biswas <re...@gmail.com> on 2017/08/09 15:19:22 UTC

Evolving serializers and impact on flink managed states

Hi, 

We have a set of XSDs which define our schema for the data and we are using
the corresponding POJO's for serialization with Flink Serialization stack.
Now, I was concerned about any evolution of our XSD schema which will
subsequently change the generated POJO's which in turn are used for creating
serdes. Also what is concerning to me is the corresponding behaviour of the
managed states(as they would be serialized using serializers defined over
old/new POJO's).

In that regard, I read about  Handling serializer upgrades and
compatitbility
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#handling-serializer-upgrades-and-compatibility>  
and found out that there is a plan to do exactly that which would involve
state migration such that old data would be read from old serializer and
then serialized back with the new serializer.

Now I have a few questions regarding the same.

1. The article in question probably makes use of Flink serialization, what
if I use Avro serde for the serialization and deserialization part. If I
create a savepoint of my job, stop my flink, load the new POJO and continue
from the savepoint, would avro's schema evolution feature perform the
transition smoothly? 
For example, a new entity is inserted, all the old values would get a
default value for which there is no value available and when an entity is
deleted, that value is simply dropped?

2. If yes, how would this play out in the flink ecosystem, and if not, would
the flink serialization upgrades in the future handle such cases(forward and
backward compatibility)?

3. Are managed state also stored and reloaded, when savepoints are created
and used for resuming a job?

4. When can one expect to have the state migration feature in Flink? In
1.4.0? 

Thanks & Regards,
Biplob







--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Evolving-serializers-and-impact-on-flink-managed-states-tp14777.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Evolving serializers and impact on flink managed states

Posted by Biplob Biswas <re...@gmail.com>.
Thanks a ton Stefan, that was really helpful. 




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Evolving-serializers-and-impact-on-flink-managed-states-tp14777p14837.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Evolving serializers and impact on flink managed states

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

yes, the assumption is correct. This is no instability, but actually stopping the user to corrupt data through attempting an unsupported operation. Migration required is the outcome of the compatibility check that would start a migration process. For Avro, the serializer does not have to signal this result, because the serialize itself can deal with all versions at all times. It can simply signal that it is compatible.

Best,
Stefan

> Am 11.08.2017 um 13:57 schrieb Biplob Biswas <re...@gmail.com>:
> 
> Hi Stefan,
> 
> Thanks a lot for such a helpful response. That really made thing a lot
> clearer for me. Although at this point I have one more and probably last
> question.
> 
> According to the Flink documentation, 
> 
> [Attention] Currently, as of Flink 1.3, if the result of the compatibility
> check acknowledges that state migration needs to be performed, the job
> simply fails to restore from the checkpoint as state migration is currently
> not available. The ability to migrate state will be introduced in future
> releases.
> 
> Now if I end up using AvroSerialization with RocksDB, I am assuming state
> migration wouldn't be needed as the Avro serializer should automatically be
> compatible with the older versions as well and thus my job wouldn't be
> affected by this inability for state migrations. 
> 
> Does my assumption sound correct? Can I expect this behaviour? 
> 
> Thanks a lot.
> 
> Best,
> Biplob
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Evolving-serializers-and-impact-on-flink-managed-states-tp14777p14830.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: Evolving serializers and impact on flink managed states

Posted by Biplob Biswas <re...@gmail.com>.
Hi Stefan,

Thanks a lot for such a helpful response. That really made thing a lot
clearer for me. Although at this point I have one more and probably last
question.

According to the Flink documentation, 

[Attention] Currently, as of Flink 1.3, if the result of the compatibility
check acknowledges that state migration needs to be performed, the job
simply fails to restore from the checkpoint as state migration is currently
not available. The ability to migrate state will be introduced in future
releases.

Now if I end up using AvroSerialization with RocksDB, I am assuming state
migration wouldn't be needed as the Avro serializer should automatically be
compatible with the older versions as well and thus my job wouldn't be
affected by this inability for state migrations. 

Does my assumption sound correct? Can I expect this behaviour? 

Thanks a lot.

Best,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Evolving-serializers-and-impact-on-flink-managed-states-tp14777p14830.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Evolving serializers and impact on flink managed states

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

> 1. The article in question probably makes use of Flink serialization, what
> if I use Avro serde for the serialization and deserialization part. If I
> create a savepoint of my job, stop my flink, load the new POJO and continue
> from the savepoint, would avro's schema evolution feature perform the
> transition smoothly? 
> For example, a new entity is inserted, all the old values would get a
> default value for which there is no value available and when an entity is
> deleted, that value is simply dropped?

Serializers can provide their own schema evolution. In this case, when the schema changes, the serializer would simply signal compatibility and deal with the schema versioning internally and transparently for Flink. However, the serializer should be able to deal with all schema versions at all time, because right now, e.g. for the RocksDB backend, it is impossible to tell if and when a state is updated and rewritten in the new schema because an explicit conversion step is currently still lacking (as described in the documentation). How such a serializer deals with new and dropped entities is up to the implementation, Flink will simply accept whatever the serializer delivers. So Avro schema evolution should work.

> 
> 2. If yes, how would this play out in the flink ecosystem, and if not, would
> the flink serialization upgrades in the future handle such cases(forward and
> backward compatibility)?

As I see it, you can now use serializers that have their own schema evolution and Flink will probably offer an additional, explicit way of schema evolution.

> 
> 3. Are managed state also stored and reloaded, when savepoints are created
> and used for resuming a job?

This depends on the definition of „stored/reloaded“ and on the state backend. If stored/reloaded means a roundtrip through serde, then the answer might be no for the RocksDB backend. This backend always contains the state serialized as bytes and goes through serde per access/update. The checkpoint/savepoint is based on the stored bytes. In contrast to that, heap based backends will go through a serialization on checkpoint/savepoint and deserialization on recover/restore.

> 
> 4. When can one expect to have the state migration feature in Flink? In
> 1.4.0? 

IIRC this is not part of the 1.4 roadmap. Flink 1.5 might be more realistic.

Best,
Stefan