You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jaya Ananthram (Jira)" <ji...@apache.org> on 2022/07/10 13:35:00 UTC

[jira] [Comment Edited] (FLINK-28467) Flink Table API Avro format fails during schema evolution

    [ https://issues.apache.org/jira/browse/FLINK-28467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17564268#comment-17564268 ] 

Jaya Ananthram edited comment on FLINK-28467 at 7/10/22 1:34 PM:
-----------------------------------------------------------------

*Root cause:* It looks like the byte buffer is not cleared properly when the message contains a new field that is not in the schema. So, as a result, the 7th message contains a partial 6th message (technically the new field) + a partial 7th message. This happens due to resuing the object [binaryDecoder|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L165]. If I create a new binaryDecoder object for each message, then it works fine in my local. Not 100% sure though, hence marking it as a comment.

 

*Note:* For the test case, we need to make sure to add a scenario for at least two messages during schema evolution as one message is parsed successfully after a schema change with the current bug.


was (Author: jaya.ananthram):
*Root cause:* It looks like the byte buffer is not cleared properly when the message contains a new field that is not in the schema. So, as a result, the 7th message contains a partial 6th message (technically the new field) + a partial 7th message. This happens due to resuing the object [binaryDecoder|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L165]. If I create a new binaryDecoder object for each message, then it works fine in my local. Not 100% sure though, hence marking it as a comment.

 

*Note:* For the test case, we need to make sure to add at least two messages parsed successfully during schema evolution as one message is parsed successfully after a schema change.

> Flink Table API Avro format fails during schema evolution
> ---------------------------------------------------------
>
>                 Key: FLINK-28467
>                 URL: https://issues.apache.org/jira/browse/FLINK-28467
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.14.4
>            Reporter: Jaya Ananthram
>            Priority: Major
>         Attachments: image-2022-07-08-15-01-49-648.png
>
>
> It looks like the Flink Table API fails during the schema evolution. ie - when we add a new optional field in the producer side, the consumer (Flink table API) fails to parse the message with the old schema. Following are the exact scenario to reproduce,
>  # Create a schema X with two field
>  # Send five messages to Kafka using schema X
>  # Update the schema X to add one new optional field with default NULL (at last position)
>  # Send five messages to Kafka using schema Y
>  # Create a Flink SQL job to consume all the 10 messages using schema X (with two fields)
>  # Exactly it will fail at the 7th message to get the exception *_Malformed data. Length is negative: -xx_* (the 6th message will pass successfully though).
> The complete stack trace is available below,
> {code:java}
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -56	at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285)	at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)	at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)	at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)	... 19 more {code}
>  
> From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html] (refer to the section "Single object Encoding" or the attached image), it looks by default Avro provides the schema evolution support, so, as an end-user I expect the Flink table API to provide the same functionalities. I can also confirm that the old schema is able to decode all the 10 messages outside of Flink (ie - using simple hello world AvroDeserializer)
> I am adding the root cause as a comment, as I am not exactly sure whether my finding is correct.
> *Note:* I am marking this as a Major ticket as we have another open ticket (here) which lacks the functionalities to  ignore failures. This means that, even if the user is ready to miss some message (when a batch contains two types of messages), still we can't specify the property to ignore it (to update the DDL once it crosses the switch over messages). So it looks like the Avro table API can't be used in PROD when we expect to change the schema. I assume most of the cases, we are expected to change schema. If the severity is not correct please feel free to reduce the priority.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)