You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Peter Schrott (Jira)" <ji...@apache.org> on 2021/10/15 12:05:00 UTC

[jira] [Comment Edited] (FLINK-24544) Failure when using Kafka connector in Table API with Avro and Confluent schema registry

    [ https://issues.apache.org/jira/browse/FLINK-24544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17429243#comment-17429243 ] 

Peter Schrott edited comment on FLINK-24544 at 10/15/21, 12:04 PM:
-------------------------------------------------------------------

I was wondering about the message of the exception: 
{noformat}
Found org.example.MyEnumType, expecting union{noformat}
It seems that the enum is not the cause here. So I stripped down the example avro like:
{noformat}
@namespace("org.example")
protocol MyProtocl {
  enum MyEnumType {
    TypeVal1, TypeVal2
  }
  record MyRecord {
    MyEnumType type;
  }
}{noformat}
and the source table sql accordingly:
{noformat}
CREATE TABLE my_table_from_kafka
(
  `type` STRING
) WITH ( ... ){noformat}
then I get the same exception:
{noformat}
Caused by: org.apache.avro.AvroTypeException: Found org.example.MyEnumType, expecting union{noformat}
This union-exception is somehow understandable as in the avro the field "type" is defied "not nullable" but in flink table SQL its nullable, so if you add "NOT NULL" such that avro and SQL match: 
{noformat}
CREATE TABLE my_table_from_kafka
(
  `type` STRING NOT NULL
) WITH ( ... ){noformat}
avro is actually complaining about not being able to deserialise the enum to string:
{noformat}
Caused by: org.apache.avro.AvroTypeException: Found org.example.MyEnumType, expecting string{noformat}

But if i change the avro such that the attribute "type" is actually nullable (union type of null and MyEnumType).
{noformat}
record MyRecord {
  union { null, MyEnumType } type;
}{noformat}
and leave the SQL without the "NOT NULL" (again avro and SQL match in the type of `type`, either nullable),  I still get the union-excpetion unexpectedly. I have no logical answer to that.


was (Author: peter.schrott):
I was wondering about the message of the exception: 
{noformat}
Found org.example.MyEnumType, expecting union{noformat}
It seems that the enum is not the cause here. So I stripped down the example avro like:
{noformat}
@namespace("org.example")
protocol MyProtocl {
  enum MyEnumType {
    TypeVal1, TypeVal2
  }
  record MyRecord {
    MyEnumType type;
  }
}{noformat}
and the source table sql accordingly:
{noformat}
CREATE TABLE my_table_from_kafka
(
  `type` STRING
) WITH ...{noformat}
then I get the same exception:
{noformat}
Caused by: org.apache.avro.AvroTypeException: Found org.example.MyEnumType, expecting union{noformat}
This union-exception is somehow understandable as in the avro the field "type" is defied "not nullable" but in flink table SQL its nullable, so if you add "NOT NULL" to the SQL like, 
{noformat}
CREATE TABLE my_table_from_kafka
(
  `type` STRING NOT NULL
) WITH ...{noformat}
avro is actually complaining about not being able to deserialise the enum to string:
{noformat}
Caused by: org.apache.avro.AvroTypeException: Found org.example.MyEnumType, expecting string{noformat}
But if i change the avro such that the attribute "type" is actually nullable (union type of null and MyEnumType):
{noformat}
record MyRecord {
  union { null, MyEnumType } type;
}{noformat}
and leave the SQL without the "NOT NULL" (also nullable) I still get the union-excpetion unexpectedly. I have no logical answer to that.

> Failure when using Kafka connector in Table API with Avro and Confluent schema registry 
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-24544
>                 URL: https://issues.apache.org/jira/browse/FLINK-24544
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Ecosystem
>    Affects Versions: 1.13.1
>            Reporter: Francesco Guardiani
>            Priority: Major
>         Attachments: flink-deser-avro-enum.zip
>
>
> A user reported in the [mailing list|https://lists.apache.org/thread.html/re38a07f6121cc580737a20c11574719cfe554e58d99817f79db9bb4a%40%3Cuser.flink.apache.org%3E] that Avro deserialization fails when using Kafka, Avro and Confluent Schema Registry:  
> {code:java}
> Caused by: java.io.IOException: Failed to deserialize Avro record.
>   at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>   at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)  
>   at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>   at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>   at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) 
>   at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>   at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>   at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, expecting union
>   at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>   at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>   at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>   at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>   at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>   at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>   at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>   at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>   ... 9 more
> {code}
> Look in the attachments for a reproducer.
> Same data serialized to a file works fine (look the filesystem example in the reproducer) 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)