You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels <ni...@gmail.com> on 2018/02/19 15:14:14 UTC

Managed State Custom Serializer with Avro

Hi all, 

I'm currently trying to use Avro in order to evolve our data present in
Flink's Managed State. I've extended the TypeSerializer class successfully
for this purpose, but still have issues using Schema Evolution. 

*The problem:*
When we try to read data (deserialize from savepoint) with a new serialiser
and a new schema, Flink seems to use the old schema of the old serializer
(written to the savepoint). This results in an old GenericRecord that
doesn't adhere to the new Avro schema. 

*What seems to happen to me is the following* (Say we evolve from dataV1 to
dataV2): 
- State containing dataV1 is serialized with avro schema V1 to a
check/savepoint. Along with the data, the serializer itself is written. 
- Upon restore, the old serializer is retrieved from the data (therefore
needs to be on the classpath). Data is restored using this old serializer.
The new serializer provided is only used for writes. 

If this is indeed the case it explains our aforementioned problem. If you
have any pointers as to whether this is true and what a possible solution
would be that would be very much appreciated! 

Thanks! 
Niels



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Managed State Custom Serializer with Avro

Posted by Arvid Heise <ar...@gmail.com>.
Hi Aljoscha,

I opened https://issues.apache.org/jira/browse/FLINK-8715 for the
RocksDB issue with pointers to the code. Let me know if you need more
details.

Best,

Arvid

On Tue, Feb 20, 2018 at 1:04 PM, Arvid Heise <ar...@gmail.com> wrote:
> Hi Aljoscha, hi Till,
>
> @Aljoscha, the new AvroSerializer is almost what I wanted except that
> it does not use the schema of the snapshot while reading. In fact,
> this version will fail with the same error as before when a field is
> added or removed.
> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L265
> needs to use the schema from
> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L188
> as the first parameter. Accordingly, a readSchema field need to be set
> in #ensureCompatibility and relayed in #duplicate.
> Should I add a ticket for that as well?
>
> @Till concerning the poor man's migration. The doc of
> #ensureCompatibility in 1.3.2 states:
>
> <li>{@link CompatibilityResult#compatible()}: this signals Flink that
> this serializer is compatible, or
> *     has been reconfigured to be compatible, to continue reading
> previous data, and that the
> *     serialization schema remains the same. No migration needs to be
> performed.</li>
>
> The important part is the reconfiguration, which is also mentioned on
> the big documentation. The default avro and kryo serializers actually
> try to reconfigure themselves.
>
> @Aljoscha, I will open a ticket for the RocksDB thingy. I pinned the
> problem down and will try to come up with an easy solution. It's a tad
> hard to compare the different versions (since I'm deep into the
> debugger), so I just might write a 1.3.2 ticket.
>
> @Till, thanks for reminding me that we are not talking about
> incremental checkpoints ;) That makes it indeed much easier to
> understand the whole state recovery with evolution.
>
> Best,
>
> Arvid
>
> On Tue, Feb 20, 2018 at 12:27 PM, Aljoscha Krettek <al...@apache.org> wrote:
>> Hi Arvid,
>>
>> Did you check out the most recent AvroSerializer code?
>> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185
>> I think this does what you're suggesting.
>>
>> Regarding the integration tests, if this is in fact the case it is not good
>> and I would be very happy about a Jira Issue/PR there.
>>
>> Regarding your last point, I think that the RockDB backend stores the
>> metadata, which includes the type serialiser snapshot once, and not for all
>> keys or key groups.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 20. Feb 2018, at 11:40, Arvid Heise <ar...@gmail.com> wrote:
>>
>> Hi guys,
>>
>> just wanted to write about that topic on my own.
>>
>> The FF talk of Tzu-Li gave me also the impression that by just using
>> AvroSerializer, we get some kind of state evolution for free.
>> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>>
>> However, I discovered two issues on 1.3.2:
>>
>> 1. The AvroSerializer does not use read/write schema. The snapshot
>> stores type information instead of the more plausible schema
>> information.
>> However, the actual type should not matter as long as a compatible
>> type is used for state restoration.
>> I have rewritten the AvroSerializer to store the schema in the
>> snapshot config and actually uses it as a read schema during the
>> initialization of the DatumReader.
>>
>> 2. During integration tests, it turns out that the current
>> implementation of the StateDescriptor always returns copies of the
>> serializer through #getSerializer. So #ensureCompatibility is invoked
>> on a different serializer than the actual #deserialize method. So
>> although my AvroSerializer sets the correct read schema, it is not
>> used, since it is set on the wrong instance.
>> I propose to make sure that #ensureCompatibility is invoked on the
>> original serializer in the state descriptor. Otherwise all adjustments
>> to the serializer are lost.
>>
>> I can provide tests and patches if needed.
>>
>> One related question:
>>
>> If I do an incremental snapshot with RocksDB backend and keyed state
>> backend, is the snapshot config attached to all keys? So would the
>> following work:
>> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with
>> snapshot.
>> * Read (key1, value1) with schema1->schema2 and write with (key1,
>> value1). Do cancel with snapshot.
>> <Now we have two different schemas in the snapshots>
>> * Read (key1, value1) with schema2 and read with (key2, value2) with
>> schema1->schema2.
>>
>> Thanks for any feedback
>>
>> Arvid
>>
>> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <ni...@gmail.com>
>> wrote:
>>
>> Hi Till,
>>
>> Thanks for the quick reply, I'm using 1.3.2 atm.
>>
>> Cheers,
>> Niels
>>
>> On Feb 19, 2018 19:10, "Till Rohrmann" <tr...@apache.org> wrote:
>>
>>
>> Hi Niels,
>>
>> which version of Flink are you using? Currently, Flink does not support to
>> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described,
>> it will try to use the old serializer stored in the checkpoint stream to
>> restore state.
>>
>> I've pulled Gordon into the conversation who can tell you a little bit
>> more about the current capability and limitations of state evolution.
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
>>
>>
>> Hi all,
>>
>> I'm currently trying to use Avro in order to evolve our data present in
>> Flink's Managed State. I've extended the TypeSerializer class
>> successfully
>> for this purpose, but still have issues using Schema Evolution.
>>
>> *The problem:*
>> When we try to read data (deserialize from savepoint) with a new
>> serialiser
>> and a new schema, Flink seems to use the old schema of the old serializer
>> (written to the savepoint). This results in an old GenericRecord that
>> doesn't adhere to the new Avro schema.
>>
>> *What seems to happen to me is the following* (Say we evolve from dataV1
>> to
>> dataV2):
>> - State containing dataV1 is serialized with avro schema V1 to a
>> check/savepoint. Along with the data, the serializer itself is written.
>> - Upon restore, the old serializer is retrieved from the data (therefore
>> needs to be on the classpath). Data is restored using this old
>> serializer.
>> The new serializer provided is only used for writes.
>>
>> If this is indeed the case it explains our aforementioned problem. If you
>> have any pointers as to whether this is true and what a possible solution
>> would be that would be very much appreciated!
>>
>> Thanks!
>> Niels
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>>
>>
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
>> To unsubscribe from Managed State Custom Serializer with Avro, click here.
>> NAML
>>
>>

Re: Managed State Custom Serializer with Avro

Posted by Arvid Heise <ar...@gmail.com>.
Hi Aljoscha, hi Till,

@Aljoscha, the new AvroSerializer is almost what I wanted except that
it does not use the schema of the snapshot while reading. In fact,
this version will fail with the same error as before when a field is
added or removed.
https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L265
needs to use the schema from
https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L188
as the first parameter. Accordingly, a readSchema field need to be set
in #ensureCompatibility and relayed in #duplicate.
Should I add a ticket for that as well?

@Till concerning the poor man's migration. The doc of
#ensureCompatibility in 1.3.2 states:

<li>{@link CompatibilityResult#compatible()}: this signals Flink that
this serializer is compatible, or
*     has been reconfigured to be compatible, to continue reading
previous data, and that the
*     serialization schema remains the same. No migration needs to be
performed.</li>

The important part is the reconfiguration, which is also mentioned on
the big documentation. The default avro and kryo serializers actually
try to reconfigure themselves.

@Aljoscha, I will open a ticket for the RocksDB thingy. I pinned the
problem down and will try to come up with an easy solution. It's a tad
hard to compare the different versions (since I'm deep into the
debugger), so I just might write a 1.3.2 ticket.

@Till, thanks for reminding me that we are not talking about
incremental checkpoints ;) That makes it indeed much easier to
understand the whole state recovery with evolution.

Best,

Arvid

On Tue, Feb 20, 2018 at 12:27 PM, Aljoscha Krettek <al...@apache.org> wrote:
> Hi Arvid,
>
> Did you check out the most recent AvroSerializer code?
> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185
> I think this does what you're suggesting.
>
> Regarding the integration tests, if this is in fact the case it is not good
> and I would be very happy about a Jira Issue/PR there.
>
> Regarding your last point, I think that the RockDB backend stores the
> metadata, which includes the type serialiser snapshot once, and not for all
> keys or key groups.
>
> Best,
> Aljoscha
>
>
> On 20. Feb 2018, at 11:40, Arvid Heise <ar...@gmail.com> wrote:
>
> Hi guys,
>
> just wanted to write about that topic on my own.
>
> The FF talk of Tzu-Li gave me also the impression that by just using
> AvroSerializer, we get some kind of state evolution for free.
> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>
> However, I discovered two issues on 1.3.2:
>
> 1. The AvroSerializer does not use read/write schema. The snapshot
> stores type information instead of the more plausible schema
> information.
> However, the actual type should not matter as long as a compatible
> type is used for state restoration.
> I have rewritten the AvroSerializer to store the schema in the
> snapshot config and actually uses it as a read schema during the
> initialization of the DatumReader.
>
> 2. During integration tests, it turns out that the current
> implementation of the StateDescriptor always returns copies of the
> serializer through #getSerializer. So #ensureCompatibility is invoked
> on a different serializer than the actual #deserialize method. So
> although my AvroSerializer sets the correct read schema, it is not
> used, since it is set on the wrong instance.
> I propose to make sure that #ensureCompatibility is invoked on the
> original serializer in the state descriptor. Otherwise all adjustments
> to the serializer are lost.
>
> I can provide tests and patches if needed.
>
> One related question:
>
> If I do an incremental snapshot with RocksDB backend and keyed state
> backend, is the snapshot config attached to all keys? So would the
> following work:
> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with
> snapshot.
> * Read (key1, value1) with schema1->schema2 and write with (key1,
> value1). Do cancel with snapshot.
> <Now we have two different schemas in the snapshots>
> * Read (key1, value1) with schema2 and read with (key2, value2) with
> schema1->schema2.
>
> Thanks for any feedback
>
> Arvid
>
> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <ni...@gmail.com>
> wrote:
>
> Hi Till,
>
> Thanks for the quick reply, I'm using 1.3.2 atm.
>
> Cheers,
> Niels
>
> On Feb 19, 2018 19:10, "Till Rohrmann" <tr...@apache.org> wrote:
>
>
> Hi Niels,
>
> which version of Flink are you using? Currently, Flink does not support to
> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described,
> it will try to use the old serializer stored in the checkpoint stream to
> restore state.
>
> I've pulled Gordon into the conversation who can tell you a little bit
> more about the current capability and limitations of state evolution.
>
> Cheers,
> Till
>
> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
>
>
> Hi all,
>
> I'm currently trying to use Avro in order to evolve our data present in
> Flink's Managed State. I've extended the TypeSerializer class
> successfully
> for this purpose, but still have issues using Schema Evolution.
>
> *The problem:*
> When we try to read data (deserialize from savepoint) with a new
> serialiser
> and a new schema, Flink seems to use the old schema of the old serializer
> (written to the savepoint). This results in an old GenericRecord that
> doesn't adhere to the new Avro schema.
>
> *What seems to happen to me is the following* (Say we evolve from dataV1
> to
> dataV2):
> - State containing dataV1 is serialized with avro schema V1 to a
> check/savepoint. Along with the data, the serializer itself is written.
> - Upon restore, the old serializer is retrieved from the data (therefore
> needs to be on the classpath). Data is restored using this old
> serializer.
> The new serializer provided is only used for writes.
>
> If this is indeed the case it explains our aforementioned problem. If you
> have any pointers as to whether this is true and what a possible solution
> would be that would be very much appreciated!
>
> Thanks!
> Niels
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>
>
> ________________________________
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
> To unsubscribe from Managed State Custom Serializer with Avro, click here.
> NAML
>
>

Re: Managed State Custom Serializer with Avro

Posted by Till Rohrmann <tr...@apache.org>.
A small addition,

currently savepoints are always full checkpoints. Thus, you should not have
the problem when calling cancel with savepoint.

Concerning 2, I think the idea was to only check for compatibility at
restore time. The check will either say its compatible or not. If it's not
compatible, then the program will fail because Flink does not support
migration yet. Therefore, there should be no need to call the ensure
compatibility on the StateDescriptor serializer if I'm not mistaken.

Cheers,
Till

On Tue, Feb 20, 2018 at 12:27 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Arvid,
>
> Did you check out the most recent AvroSerializer code? https://github.com/
> apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/
> flink-avro/src/main/java/org/apache/flink/formats/avro/
> typeutils/AvroSerializer.java#L185 I think this does what you're
> suggesting.
>
> Regarding the integration tests, if this is in fact the case it is not
> good and I would be very happy about a Jira Issue/PR there.
>
> Regarding your last point, I think that the RockDB backend stores the
> metadata, which includes the type serialiser snapshot once, and not for all
> keys or key groups.
>
> Best,
> Aljoscha
>
>
> On 20. Feb 2018, at 11:40, Arvid Heise <ar...@gmail.com> wrote:
>
> Hi guys,
>
> just wanted to write about that topic on my own.
>
> The FF talk of Tzu-Li gave me also the impression that by just using
> AvroSerializer, we get some kind of state evolution for free.
> https://www.slideshare.net/FlinkForward/flink-forward-
> berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>
> However, I discovered two issues on 1.3.2:
>
> 1. The AvroSerializer does not use read/write schema. The snapshot
> stores type information instead of the more plausible schema
> information.
> However, the actual type should not matter as long as a compatible
> type is used for state restoration.
> I have rewritten the AvroSerializer to store the schema in the
> snapshot config and actually uses it as a read schema during the
> initialization of the DatumReader.
>
> 2. During integration tests, it turns out that the current
> implementation of the StateDescriptor always returns copies of the
> serializer through #getSerializer. So #ensureCompatibility is invoked
> on a different serializer than the actual #deserialize method. So
> although my AvroSerializer sets the correct read schema, it is not
> used, since it is set on the wrong instance.
> I propose to make sure that #ensureCompatibility is invoked on the
> original serializer in the state descriptor. Otherwise all adjustments
> to the serializer are lost.
>
> I can provide tests and patches if needed.
>
> One related question:
>
> If I do an incremental snapshot with RocksDB backend and keyed state
> backend, is the snapshot config attached to all keys? So would the
> following work:
> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with
> snapshot.
> * Read (key1, value1) with schema1->schema2 and write with (key1,
> value1). Do cancel with snapshot.
> <Now we have two different schemas in the snapshots>
> * Read (key1, value1) with schema2 and read with (key2, value2) with
> schema1->schema2.
>
> Thanks for any feedback
>
> Arvid
>
> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <ni...@gmail.com>
> wrote:
>
> Hi Till,
>
> Thanks for the quick reply, I'm using 1.3.2 atm.
>
> Cheers,
> Niels
>
> On Feb 19, 2018 19:10, "Till Rohrmann" <tr...@apache.org> wrote:
>
>
> Hi Niels,
>
> which version of Flink are you using? Currently, Flink does not support to
> upgrade the TypeSerializer itself, if I'm not mistaken. As you've
> described,
> it will try to use the old serializer stored in the checkpoint stream to
> restore state.
>
> I've pulled Gordon into the conversation who can tell you a little bit
> more about the current capability and limitations of state evolution.
>
> Cheers,
> Till
>
> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
>
>
> Hi all,
>
> I'm currently trying to use Avro in order to evolve our data present in
> Flink's Managed State. I've extended the TypeSerializer class
> successfully
> for this purpose, but still have issues using Schema Evolution.
>
> *The problem:*
> When we try to read data (deserialize from savepoint) with a new
> serialiser
> and a new schema, Flink seems to use the old schema of the old serializer
> (written to the savepoint). This results in an old GenericRecord that
> doesn't adhere to the new Avro schema.
>
> *What seems to happen to me is the following* (Say we evolve from dataV1
> to
> dataV2):
> - State containing dataV1 is serialized with avro schema V1 to a
> check/savepoint. Along with the data, the serializer itself is written.
> - Upon restore, the old serializer is retrieved from the data (therefore
> needs to be on the classpath). Data is restored using this old
> serializer.
> The new serializer provided is only used for writes.
>
> If this is indeed the case it explains our aforementioned problem. If you
> have any pointers as to whether this is true and what a possible solution
> would be that would be very much appreciated!
>
> Thanks!
> Niels
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>
>
> ________________________________
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
> To unsubscribe from Managed State Custom Serializer with Avro, click here.
> NAML
>
>
>

Re: Managed State Custom Serializer with Avro

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Arvid,

Did you check out the most recent AvroSerializer code? https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185 <https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185> I think this does what you're suggesting.

Regarding the integration tests, if this is in fact the case it is not good and I would be very happy about a Jira Issue/PR there.

Regarding your last point, I think that the RockDB backend stores the metadata, which includes the type serialiser snapshot once, and not for all keys or key groups.

Best,
Aljoscha

> On 20. Feb 2018, at 11:40, Arvid Heise <ar...@gmail.com> wrote:
> 
> Hi guys,
> 
> just wanted to write about that topic on my own.
> 
> The FF talk of Tzu-Li gave me also the impression that by just using
> AvroSerializer, we get some kind of state evolution for free.
> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
> 
> However, I discovered two issues on 1.3.2:
> 
> 1. The AvroSerializer does not use read/write schema. The snapshot
> stores type information instead of the more plausible schema
> information.
> However, the actual type should not matter as long as a compatible
> type is used for state restoration.
> I have rewritten the AvroSerializer to store the schema in the
> snapshot config and actually uses it as a read schema during the
> initialization of the DatumReader.
> 
> 2. During integration tests, it turns out that the current
> implementation of the StateDescriptor always returns copies of the
> serializer through #getSerializer. So #ensureCompatibility is invoked
> on a different serializer than the actual #deserialize method. So
> although my AvroSerializer sets the correct read schema, it is not
> used, since it is set on the wrong instance.
> I propose to make sure that #ensureCompatibility is invoked on the
> original serializer in the state descriptor. Otherwise all adjustments
> to the serializer are lost.
> 
> I can provide tests and patches if needed.
> 
> One related question:
> 
> If I do an incremental snapshot with RocksDB backend and keyed state
> backend, is the snapshot config attached to all keys? So would the
> following work:
> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with snapshot.
> * Read (key1, value1) with schema1->schema2 and write with (key1,
> value1). Do cancel with snapshot.
> <Now we have two different schemas in the snapshots>
> * Read (key1, value1) with schema2 and read with (key2, value2) with
> schema1->schema2.
> 
> Thanks for any feedback
> 
> Arvid
> 
> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <ni...@gmail.com> wrote:
>> Hi Till,
>> 
>> Thanks for the quick reply, I'm using 1.3.2 atm.
>> 
>> Cheers,
>> Niels
>> 
>> On Feb 19, 2018 19:10, "Till Rohrmann" <tr...@apache.org> wrote:
>>> 
>>> Hi Niels,
>>> 
>>> which version of Flink are you using? Currently, Flink does not support to
>>> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described,
>>> it will try to use the old serializer stored in the checkpoint stream to
>>> restore state.
>>> 
>>> I've pulled Gordon into the conversation who can tell you a little bit
>>> more about the current capability and limitations of state evolution.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
>>>> 
>>>> Hi all,
>>>> 
>>>> I'm currently trying to use Avro in order to evolve our data present in
>>>> Flink's Managed State. I've extended the TypeSerializer class
>>>> successfully
>>>> for this purpose, but still have issues using Schema Evolution.
>>>> 
>>>> *The problem:*
>>>> When we try to read data (deserialize from savepoint) with a new
>>>> serialiser
>>>> and a new schema, Flink seems to use the old schema of the old serializer
>>>> (written to the savepoint). This results in an old GenericRecord that
>>>> doesn't adhere to the new Avro schema.
>>>> 
>>>> *What seems to happen to me is the following* (Say we evolve from dataV1
>>>> to
>>>> dataV2):
>>>> - State containing dataV1 is serialized with avro schema V1 to a
>>>> check/savepoint. Along with the data, the serializer itself is written.
>>>> - Upon restore, the old serializer is retrieved from the data (therefore
>>>> needs to be on the classpath). Data is restored using this old
>>>> serializer.
>>>> The new serializer provided is only used for writes.
>>>> 
>>>> If this is indeed the case it explains our aforementioned problem. If you
>>>> have any pointers as to whether this is true and what a possible solution
>>>> would be that would be very much appreciated!
>>>> 
>>>> Thanks!
>>>> Niels
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>> 
>>> 
>>> 
>>> 
>>> ________________________________
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
>>> To unsubscribe from Managed State Custom Serializer with Avro, click here.
>>> NAML


Re: Managed State Custom Serializer with Avro

Posted by Niels <ni...@gmail.com>.
Hi guys,

Noticing similar things as Arvid mentions. I currently solved the issue by
also supporting GenericRecords written and read with the old schema and
parse them to the new schema myself. This at least gives us the evolution
until state migration is there. 

Thanks for your help!

Cheers,
Niels



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Managed State Custom Serializer with Avro

Posted by Arvid Heise <ar...@gmail.com>.
Hi guys,

just wanted to write about that topic on my own.

The FF talk of Tzu-Li gave me also the impression that by just using
AvroSerializer, we get some kind of state evolution for free.
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink

However, I discovered two issues on 1.3.2:

1. The AvroSerializer does not use read/write schema. The snapshot
stores type information instead of the more plausible schema
information.
However, the actual type should not matter as long as a compatible
type is used for state restoration.
I have rewritten the AvroSerializer to store the schema in the
snapshot config and actually uses it as a read schema during the
initialization of the DatumReader.

2. During integration tests, it turns out that the current
implementation of the StateDescriptor always returns copies of the
serializer through #getSerializer. So #ensureCompatibility is invoked
on a different serializer than the actual #deserialize method. So
although my AvroSerializer sets the correct read schema, it is not
used, since it is set on the wrong instance.
I propose to make sure that #ensureCompatibility is invoked on the
original serializer in the state descriptor. Otherwise all adjustments
to the serializer are lost.

I can provide tests and patches if needed.

One related question:

If I do an incremental snapshot with RocksDB backend and keyed state
backend, is the snapshot config attached to all keys? So would the
following work:
* Write (key1, value1) and (key2, value2) with schema1. Do cancel with snapshot.
* Read (key1, value1) with schema1->schema2 and write with (key1,
value1). Do cancel with snapshot.
<Now we have two different schemas in the snapshots>
* Read (key1, value1) with schema2 and read with (key2, value2) with
schema1->schema2.

Thanks for any feedback

Arvid

On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <ni...@gmail.com> wrote:
> Hi Till,
>
> Thanks for the quick reply, I'm using 1.3.2 atm.
>
> Cheers,
> Niels
>
> On Feb 19, 2018 19:10, "Till Rohrmann" <tr...@apache.org> wrote:
>>
>> Hi Niels,
>>
>> which version of Flink are you using? Currently, Flink does not support to
>> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described,
>> it will try to use the old serializer stored in the checkpoint stream to
>> restore state.
>>
>> I've pulled Gordon into the conversation who can tell you a little bit
>> more about the current capability and limitations of state evolution.
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
>>>
>>> Hi all,
>>>
>>> I'm currently trying to use Avro in order to evolve our data present in
>>> Flink's Managed State. I've extended the TypeSerializer class
>>> successfully
>>> for this purpose, but still have issues using Schema Evolution.
>>>
>>> *The problem:*
>>> When we try to read data (deserialize from savepoint) with a new
>>> serialiser
>>> and a new schema, Flink seems to use the old schema of the old serializer
>>> (written to the savepoint). This results in an old GenericRecord that
>>> doesn't adhere to the new Avro schema.
>>>
>>> *What seems to happen to me is the following* (Say we evolve from dataV1
>>> to
>>> dataV2):
>>> - State containing dataV1 is serialized with avro schema V1 to a
>>> check/savepoint. Along with the data, the serializer itself is written.
>>> - Upon restore, the old serializer is retrieved from the data (therefore
>>> needs to be on the classpath). Data is restored using this old
>>> serializer.
>>> The new serializer provided is only used for writes.
>>>
>>> If this is indeed the case it explains our aforementioned problem. If you
>>> have any pointers as to whether this is true and what a possible solution
>>> would be that would be very much appreciated!
>>>
>>> Thanks!
>>> Niels
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>>
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
>> To unsubscribe from Managed State Custom Serializer with Avro, click here.
>> NAML

Re: Managed State Custom Serializer with Avro

Posted by Niels Denissen <ni...@gmail.com>.
Hi Till,

Thanks for the quick reply, I'm using 1.3.2 atm.

Cheers,
Niels

On Feb 19, 2018 19:10, "Till Rohrmann" <tr...@apache.org> wrote:

> Hi Niels,
>
> which version of Flink are you using? Currently, Flink does not support to
> upgrade the TypeSerializer itself, if I'm not mistaken. As you've
> described, it will try to use the old serializer stored in the checkpoint
> stream to restore state.
>
> I've pulled Gordon into the conversation who can tell you a little bit
> more about the current capability and limitations of state evolution.
>
> Cheers,
> Till
>
> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=18437&i=0>> wrote:
>
>> Hi all,
>>
>> I'm currently trying to use Avro in order to evolve our data present in
>> Flink's Managed State. I've extended the TypeSerializer class successfully
>> for this purpose, but still have issues using Schema Evolution.
>>
>> *The problem:*
>> When we try to read data (deserialize from savepoint) with a new
>> serialiser
>> and a new schema, Flink seems to use the old schema of the old serializer
>> (written to the savepoint). This results in an old GenericRecord that
>> doesn't adhere to the new Avro schema.
>>
>> *What seems to happen to me is the following* (Say we evolve from dataV1
>> to
>> dataV2):
>> - State containing dataV1 is serialized with avro schema V1 to a
>> check/savepoint. Along with the data, the serializer itself is written.
>> - Upon restore, the old serializer is retrieved from the data (therefore
>> needs to be on the classpath). Data is restored using this old serializer.
>> The new serializer provided is only used for writes.
>>
>> If this is indeed the case it explains our aforementioned problem. If you
>> have any pointers as to whether this is true and what a possible solution
>> would be that would be very much appreciated!
>>
>> Thanks!
>> Niels
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
> To unsubscribe from Managed State Custom Serializer with Avro, click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=18419&code=bmllbHNkZW5pc3NlbkBnbWFpbC5jb218MTg0MTl8MTgzMDMxMDYx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>

Re: Managed State Custom Serializer with Avro

Posted by Till Rohrmann <tr...@apache.org>.
Hi Niels,

which version of Flink are you using? Currently, Flink does not support to
upgrade the TypeSerializer itself, if I'm not mistaken. As you've
described, it will try to use the old serializer stored in the checkpoint
stream to restore state.

I've pulled Gordon into the conversation who can tell you a little bit more
about the current capability and limitations of state evolution.

Cheers,
Till

On Mon, Feb 19, 2018 at 4:14 PM, Niels <ni...@gmail.com> wrote:

> Hi all,
>
> I'm currently trying to use Avro in order to evolve our data present in
> Flink's Managed State. I've extended the TypeSerializer class successfully
> for this purpose, but still have issues using Schema Evolution.
>
> *The problem:*
> When we try to read data (deserialize from savepoint) with a new serialiser
> and a new schema, Flink seems to use the old schema of the old serializer
> (written to the savepoint). This results in an old GenericRecord that
> doesn't adhere to the new Avro schema.
>
> *What seems to happen to me is the following* (Say we evolve from dataV1 to
> dataV2):
> - State containing dataV1 is serialized with avro schema V1 to a
> check/savepoint. Along with the data, the serializer itself is written.
> - Upon restore, the old serializer is retrieved from the data (therefore
> needs to be on the classpath). Data is restored using this old serializer.
> The new serializer provided is only used for writes.
>
> If this is indeed the case it explains our aforementioned problem. If you
> have any pointers as to whether this is true and what a possible solution
> would be that would be very much appreciated!
>
> Thanks!
> Niels
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>