You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Tzu-Li (Gordon) Tai" <tz...@apache.org> on 2019/05/14 08:43:09 UTC

Re: Avro state migration using Scala in Flink 1.7.2 (and 1.8)

Hi Marc!

I know we talked offline about the issues mentioned in this topic already,
but I'm just relaying the result of the discussions here to make it
searchable by others bumping into the same issues.

On Thu, Mar 21, 2019 at 4:27 PM Marc Rooding <ma...@webresource.nl> wrote:

> Hi
>
> I’ve been trying to get state migration with Avro working on Flink 1.7.2
> using Scala case classes but I’m not getting anywhere closer to solving it.
>
> We’re using the most basic streaming WordCount example as a reference to
> test the schema evolution:
>
> val wordCountStream: DataStream[WordWithCount] = text
>   .flatMap { w => w.split("\\s") }
>   .map { w => WordWithCount(w, 1) }
>   .keyBy(_.word)
>   .reduce((a, b) => WordWithCount(a.word, a.count + b.count))
>
>
> In this example, WordWithCount is our data object that we’d like to have
> serialized and deserialized with schema evolution support since keyBy
> maintains state.
>
> I understood from the documentation that it would only work for classes
> generated from Avro schema’s so I’ve tried using sbt-avrohugger to generate
> our case classes. However, for normal case classes generated by Avro we
> quickly ran into the problem that we needed a no-arg constructor.
>
> We looked at the flink-avro module and noticed that the classes generated
> by the avro-maven-plugin were implementing SpecificRecord and seemed to
> comply with the POJO rules as described in the Flink documentation. After
> switching from normal to specific avro generation with sbt-avrohugger, we
> ended up with Scala case classes that should comply with all rules.
>
> An example of such a generated case class is as follows:
>
> /** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
> import scala.annotation.switch
>
> case class WordWithCount(var word: String, var count: Long) extends org.apache.avro.specific.SpecificRecordBase {
>   def this() = this("", 0L)
>   def get(field$: Int): AnyRef = {
>     (field$: @switch) match {
>       case 0 => {
>         word
>       }.asInstanceOf[AnyRef]
>       case 1 => {
>         count
>       }.asInstanceOf[AnyRef]
>       case _ => new org.apache.avro.AvroRuntimeException("Bad index")
>     }
>   }
>   def put(field$: Int, value: Any): Unit = {
>     (field$: @switch) match {
>       case 0 => this.word = {
>         value.toString
>       }.asInstanceOf[String]
>       case 1 => this.count = {
>         value
>       }.asInstanceOf[Long]
>       case _ => new org.apache.avro.AvroRuntimeException("Bad index")
>     }
>     ()
>   }
>   def getSchema: org.apache.avro.Schema = WordWithCount.SCHEMA$
> }
>
> object WordWithCount {
>   val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"WordWithCount\",\"fields\":[{\"name\":\"word\",\"type\":\"string\"},{\"name\":\"count\",\"type\":\"long\"}]}")
> }
>
>
> This, however, also didn’t work out of the box. We then tried to define
> our own type information using flink-avro’s AvroTypeInfo but this fails
> because Avro looks for a SCHEMA$ property (SpecificData:285) in the class
> and is unable to use Java reflection to identify the SCHEMA$ in the Scala
> companion object.
>

This is now tracked by https://issues.apache.org/jira/browse/FLINK-12501.
This is really a problem with Avro's SpecificData#getSchema() method not
working well with a specific 3rd library implementation like avrohugger.
Either this is fixed in avrohugger, or we work-around this by explicitly
handling the case.

> implicit val wordWithCountInfo: AvroTypeInfo[WordWithCount] = new AvroTypeInfo(classOf[WordWithCount])
>
> We then read in the 1.7 documentation that Flink doesn’t natively support
> POJO types, but only state defined by descriptors, like f.e. the
> ListStateDescriptor, and only if you allow Flink to infer the type
> information. This is definitely what we need for our processors that have
> map and list state. However, for the simple word count example, we should
> only need native POJO (de)serialization with state migration.
>
> We then noticed Github PR #7759 that adds support for POJO state schema
> evolution/migration. We wanted to give this a try and built flink from
> source from the release-1.8 branch. We then included the 1.8-SNAPSHOT jars
> in our job and got a local 1.8 cluster and job running fine.
>
> However, if we do not specify our own type information, and perform the
> following steps:
>
>
>    1. Run the job
>    2. Create a savepoint and stop the job
>    3. Update the WordWithCount avro schema to include a third field
>    4. Update the job according to the generated case class
>    5. Run the new job from the savepoint
>
>
> We are then faced with the following error:
>
> Caused by: java.lang.IllegalArgumentException: array is not of length 3
> thrown from ScalaCaseClassSerializer.scala:50
>

I think there is a misunderstanding here.
Scala case classes are not considered as POJOs by Flink. Since 1.8, Flink
does support schema evolution for POJOs, but not for Scala case classes yet.
That would explain the mismatching array length error message you got,
since you added a new field to the case class.


>
> However, if we again try to define our own type information using the
> AvroTypeInfo class, we are faced with the same issue as in 1.7.
>
> What are we missing? The documentation on how to use this is very limited,
> and we’re getting the idea that it may work with Java types, but maybe not
> with Scala case classes. I’d love to hear some pointers on how to approach
> this? Compared to our solution in 1.4 (
> https://medium.com/wbaa/evolve-your-data-model-in-flinks-state-using-avro-f26982afa399),
> we hoped to get rid of all the custom serializers by moving to 1.7
>

> Thanks in advance!
>
> Marc
>

So, to conclude this:

- With FLINK-12501 fixed, Avro + Scala case classes generated using
Avrohugger should work. Would be great if you can send me a minimal project
to reproduce your error.
- Schema evolution for case classes is not supported, so using case classes
directly at the moment would not work. This would be a much bigger effort
compared to the above, so I can't be certain when this will be supported.
- Without any of the above, custom serializers is still the way to go right
now if you want to have case classes + schema evolution. WIth the new
serializer snapshot abstractions in 1.8, it would still be easy to remove
them completely and migrate to Flink-shipped serializers eventually, once
they support this scenario.
- The documentation should probably clarify things a bit more regarding
support in Java v.s. Scala, and case classes / POJO.

Cheers,
Gordon