You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by mrooding <ad...@webresource.nl> on 2017/09/20 12:16:27 UTC

Savepoints and migrating value state data types

Hi

We've got a situation where we're merging several Kafka streams and for
certain streams, we want to retain up to 6 days of history. We're trying to
figure out how we can migrate savepoint data between application updates
when the data type for a certain state buffer updates.

Let's assume that we have 2 streams with the following data types:

case class A(id: String, name: String)
case class B1(id: String, price: Double)

We have a CoProcessFunction which combines the 2 streams and maintains 2
different buffer states:

MapState[String, A] and ValueState[B1]

In our scenario, we're trying to anticipate the data type of B1 changing in
the future. Let's assume that in the foreseeable future, B1 will change to:

case class B2(id: String, price: Double, date: String)

When we create a snapshot using B1 and then upgrading the application to B2
the obvious attempt would be to try and retrieve the stored ValueState and
the new ValueState:

val oldState = getRuntimeContext.getState(new
ValueStateDescriptor[B1]("1Buffer", createTypeInformation[B1]))
val newState = getRuntimeContext.getState(new
ValueStateDescriptor[B2]("2Buffer", createTypeInformation[B2]))

However, as soon as you do the following error occurs:

Unable to restore keyed state [aBuffer]. For memory-backed keyed state, the
previous serializer of the keyed state must be present; the serializer could
have been removed from the classpath, or its implementation have changed and
could not be loaded. This is a temporary restriction that will be fixed in
future versions.

Our assumption is that the process operator which has a specified ID which
Flink uses to save and restore savepoints. The CoProcessorFunction types
changed from CoProcessFunction[A, B1, A] to CoProcessFunction[A, B2, A] and
therefore the savepoint data does not apply to the operator anymore. Is this
assumption correct?

We've been going through the documentation and source code of Flink and it
seems like there's no achieve this kind of migrations. If this is the case,
we'd be interested in contributing to Flink to get this added a.s.a.p. and
would love to get some feedback on how to approach this.

Thanks in advance

Marc



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Savepoints and migrating value state data types

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Marc,
I assume you have set a UID for your CoProcessFunction as described in [1]?
Also, can you provide the Flink version you are working with and the 
serializer you are using?

If you have the UID set, your strategy seems to be the same as proposed by 
[2]: "Although it is not possible to change the data type of operator state, a 
workaround to overcome this limitation can be to define a second state with a 
different data type and to implement logic to migrate the state from the 
original state into the new state."

I'm no expert on this but it looks like it should work (although I'm curious 
on where the "aBuffer" in the error message comes from). I'm forwarding this 
to Gordon in CC because he probably knows better as he was involved in state 
migration before (afaik).



Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/
upgrading.html#application-state-compatibility
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/
upgrading.html#stateful-operators-and-user-functions

On Wednesday, 20 September 2017 14:16:27 CEST mrooding wrote:
> Hi
> 
> We've got a situation where we're merging several Kafka streams and for
> certain streams, we want to retain up to 6 days of history. We're trying to
> figure out how we can migrate savepoint data between application updates
> when the data type for a certain state buffer updates.
> 
> Let's assume that we have 2 streams with the following data types:
> 
> case class A(id: String, name: String)
> case class B1(id: String, price: Double)
> 
> We have a CoProcessFunction which combines the 2 streams and maintains 2
> different buffer states:
> 
> MapState[String, A] and ValueState[B1]
> 
> In our scenario, we're trying to anticipate the data type of B1 changing in
> the future. Let's assume that in the foreseeable future, B1 will change to:
> 
> case class B2(id: String, price: Double, date: String)
> 
> When we create a snapshot using B1 and then upgrading the application to B2
> the obvious attempt would be to try and retrieve the stored ValueState and
> the new ValueState:
> 
> val oldState = getRuntimeContext.getState(new
> ValueStateDescriptor[B1]("1Buffer", createTypeInformation[B1]))
> val newState = getRuntimeContext.getState(new
> ValueStateDescriptor[B2]("2Buffer", createTypeInformation[B2]))
> 
> However, as soon as you do the following error occurs:
> 
> Unable to restore keyed state [aBuffer]. For memory-backed keyed state, the
> previous serializer of the keyed state must be present; the serializer could
> have been removed from the classpath, or its implementation have changed
> and could not be loaded. This is a temporary restriction that will be fixed
> in future versions.
> 
> Our assumption is that the process operator which has a specified ID which
> Flink uses to save and restore savepoints. The CoProcessorFunction types
> changed from CoProcessFunction[A, B1, A] to CoProcessFunction[A, B2, A] and
> therefore the savepoint data does not apply to the operator anymore. Is this
> assumption correct?
> 
> We've been going through the documentation and source code of Flink and it
> seems like there's no achieve this kind of migrations. If this is the case,
> we'd be interested in contributing to Flink to get this added a.s.a.p. and
> would love to get some feedback on how to approach this.
> 
> Thanks in advance
> 
> Marc
> 
> 
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Savepoints and migrating value state data types

Posted by Aljoscha Krettek <al...@apache.org>.
Actually, Flink 1.4 will come with improved Avro support. See especially:

 - https://issues.apache.org/jira/browse/FLINK-7420: <https://issues.apache.org/jira/browse/FLINK-7420:> Move All Avro Code to flink-avro
 - https://issues.apache.org/jira/browse/FLINK-7997: <https://issues.apache.org/jira/browse/FLINK-7997:> Avro should be always in the user code
 - https://issues.apache.org/jira/browse/FLINK-6022: <https://issues.apache.org/jira/browse/FLINK-6022:> Improve support for Avro GenericRecord

This makes AvroTypeInfo and AvroSerializer quite usable.

> On 6. Nov 2017, at 15:13, mrooding <ad...@webresource.nl> wrote:
> 
> Hi Gordon
> 
> I've been looking into creating a custom AvroSerializer without Kryo which
> would support Avro schemas and I'm starting to wonder if this is actually
> the most straightforward way to do it. 
> 
> If I extend a class from TypeSerializer I would also need to implement a
> TypeInformation class to be able to provide my serializer. Implementing all
> these classes seems to be quite the ordeal without proper documentation. Are
> you sure that this is the right way forward and that there's no other option
> of using Avro serialization with schema support for Flink?
> 
> Thanks again
> 
> Marc
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Savepoints and migrating value state data types

Posted by mrooding <ad...@webresource.nl>.
Hi Gordon

I've been looking into creating a custom AvroSerializer without Kryo which
would support Avro schemas and I'm starting to wonder if this is actually
the most straightforward way to do it. 

If I extend a class from TypeSerializer I would also need to implement a
TypeInformation class to be able to provide my serializer. Implementing all
these classes seems to be quite the ordeal without proper documentation. Are
you sure that this is the right way forward and that there's no other option
of using Avro serialization with schema support for Flink?

Thanks again

Marc





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Savepoints and migrating value state data types

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Yes, the AvroSerializer currently partially still uses Kryo for object copying.
Also, right now, I think the AvroSerializer is only used when the type is recognized as a POJO, and that `isForceAvroEnabled` is set on the job configuration. I’m not sure if that is always possible.
As mentioned in [1], we would probably need to improve the user experience for Avro usage.

For now, if you want to directly use Avro only for serializing your state, AFAIK the straightforward approach would be, as you mentioned, to extend a custom TypeSerializer that uses the Avro constructs.
Flink’s AvroSerializer actually already sorts of does this, so you can refer to that implementation as a base line.

Cheers,
Gordon

[1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-performance-td12019.html
On 5 October 2017 at 4:39:10 PM, mrooding (admin@webresource.nl) wrote:

Gordon  

Thanks for the detailed response. I have verified your assumption and that  
is, unfortunately, the case.  

I also looked into creating a custom Kryo serializer but I got stuck on  
serializing arrays of complex types. It seems like this isn't trivial at all  
with Kryo.  

As an alternative, I've been looking into using Avro only for the Flink  
buffers. Basically, as a first step, we'd still be sending JSON messages  
through Kafka but we would use a custom TypeSerializer that converts the  
case classes to bytes using Avro and vice versa. Unfortunately,  
documentation is really scarce.  

In a different topic,  
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-performance-td12019.html,  
it says that Avro is a bit of an odd citizen and that the AvroSerializer  
provided by Flink uses Kryo. This confirms what I've found by going through  
the source code of Flink myself.  

I hope that you can provide me with some pointers. Is extending  
TypeSerializer[T] the best way forward if we only want to use Avro for state  
buffers and thus utilize Avro's schema migration facilities?  

Any pointers would be greatly appreciated!  

Kind regards  

Marc  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/  

Re: Savepoints and migrating value state data types

Posted by mrooding <ad...@webresource.nl>.
Gordon

Thanks for the detailed response. I have verified your assumption and that
is, unfortunately, the case.

I also looked into creating a custom Kryo serializer but I got stuck on
serializing arrays of complex types. It seems like this isn't trivial at all
with Kryo.

As an alternative, I've been looking into using Avro only for the Flink
buffers. Basically, as a first step, we'd still be sending JSON messages
through Kafka but we would use a custom TypeSerializer that converts the
case classes to bytes using Avro and vice versa. Unfortunately,
documentation is really scarce. 

In a different topic,
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-performance-td12019.html,
it says that Avro is a bit of an odd citizen and that the AvroSerializer
provided by Flink uses Kryo. This confirms what I've found by going through
the source code of Flink myself. 

I hope that you can provide me with some pointers. Is extending
TypeSerializer[T] the best way forward if we only want to use Avro for state
buffers and thus utilize Avro's schema migration facilities?

Any pointers would be greatly appreciated!

Kind regards

Marc



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Savepoints and migrating value state data types

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi!

The exception that you have bumped into indicates that on the restore of the
savepoint, the serializer for that registered state in the savepoint no
longer exists. This prevents restoring savepoints taken with memory state
backends because there will be no serializer available to deserialize the
state at restore time.

My current guess of what is happening is that the generated serializers for
case class B1 has different generated names, and therefore in your modified
repackaged job, it would be as if the previous serializer no longer exists
in the classpath.

The serializer generation for Scala case classes (the
`createTypeInformation[B1]` call) depends on some Scala macros, and the
resulting generated anonymous class is sensitive to quite a few factors.
Could you perhaps try to verify this by checking the classname of the
generated serializers (you can get the serializer by
`createTypeInformation[B1].getSerializer(new Configuration())`)?

If they are different for the same case class across 2 different
compilations of your job (one with the B2 case class, one without), then my
assumption is correct. Otherwise, we would of course need to look deeper.
I'll also think about how to probably best workaround that for now meanwhile
and get back to you ..

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/