You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Dennis Yung (Jira)" <ji...@apache.org> on 2020/08/19 05:41:00 UTC

[jira] [Created] (BEAM-10759) KafkaIO with Avro deserializer fails with evolved schema

Dennis Yung created BEAM-10759:
----------------------------------

             Summary: KafkaIO with Avro deserializer fails with evolved schema
                 Key: BEAM-10759
                 URL: https://issues.apache.org/jira/browse/BEAM-10759
             Project: Beam
          Issue Type: Bug
          Components: io-java-kafka
    Affects Versions: 2.23.0
            Reporter: Dennis Yung
            Assignee: Dennis Yung


When using KafkaIO with ConfluentSchemaRegistryDeserializerProvider, exception could be thrown when consuming a topic with evolved schema.

It is because when the DeserializerProvider is initialized, it create a AvroCoder instance using either the latest Avro schema by default, or a specific version of provided.

If the Kafka topic contains records with multiple schema versions, AvroCoder will fail to encode records with different schemas. The specific exception differs depending on the schema change. For example, I have encountered type cast error and null pointer error. 

To fix this issue, we can make use of the writer-reader schema arguments from Avro to deserialize Kafka records to the same schema with the AvroCoder. The method is available in io.confluent.kafka.serializers.KafkaAvroDeserializer
{code:java}
    public Object deserialize(String s, byte[] bytes, Schema readerSchema) {
        return this.deserialize(bytes, readerSchema);
    }
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)