You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2021/08/09 10:26:19 UTC

How to deserialize Avro enum type in Flink SQL?

Hi community,

I have a Kafka topic where the schema of its values is defined by the
"MyRecord" record in the following Avro IDL and registered to the Confluent
Schema Registry.

> @namespace("my.type.avro")
> protocol MyProtocol {
>   enum MyEnumType {
>     TypeVal1, TypeVal2
>   }
>   record MyEntry {
>     MyEnumType type;
>   }
>   record MyRecord {
>     array<MyEntry> entries;
>   }
> }


To read from the topic, I've defined the following DDL:

> CREATE TABLE my_table

(
>     `entries` ARRAY<ROW<
>         *`type` ??? (This is the main question)*
>     >>
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'my-topic',
>     'properties.bootstrap.servers' = '...:9092',
>     'scan.startup.mode' = 'latest-offset',
>     'value.format' = 'avro-confluent',
>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>
)


And I run the following query :

> SELECT * FROM my_table


Now I got the following messages in Flink-1.13.1 when I use *STRING* for
the type:

> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>   at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>   at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>   at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>   at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> *Caused by: org.apache.avro.AvroTypeException: Found
> my.type.avro.MyEnumType, expecting union*
>   at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>   at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>   at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>   ... 9 more

The reason I use the STRING type is just for fast-prototyping.

While reading through [1], I've been thinking about using *RAW('class',
'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
whether it is a good idea and if so, what can be a value for the snapshot.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw

Thanks in advance,

Dongwon

Re: Fwd: How to deserialize Avro enum type in Flink SQL?

Posted by Peter Schrott <pe...@googlemail.com>.
Hi people,

I found a workaround for that issue - which works at least for my use case.

The main idea was customizing "org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory" such that the expected avro schema is not gained from the CREATE TABLE SQL statement, rather than passed in as parameter. This results in matching schemas (actual and expected) when avro is deserialized under the hood by avro lib.

This gives me the freedom to create a source table exactly matching the records in Kafka. In my case the CREATE TABLE SQL statement is generated from the avro schema. So actually doing it the other way round as it is done in current implementation.

Please compare: https://github.com/peterschrott/flinkDeserAvroEnum

Maybe this helps someone.

Best, Peter


On 2021/10/12 16:18:30 Dongwon Kim wrote:
> Hi community,
> 
> Can I get advice on this question?
> 
> Another user just sent me an email asking whether I found a solution or a
> workaround for this question, but I'm still stuck there.
> 
> Any suggestions?
> 
> Thanks in advance,
> 
> Dongwon
> 
> ---------- Forwarded message ---------
> From: Dongwon Kim <ea...@gmail.com>
> Date: Mon, Aug 9, 2021 at 7:26 PM
> Subject: How to deserialize Avro enum type in Flink SQL?
> To: user <us...@flink.apache.org>
> 
> 
> Hi community,
> 
> I have a Kafka topic where the schema of its values is defined by the
> "MyRecord" record in the following Avro IDL and registered to the Confluent
> Schema Registry.
> 
> > @namespace("my.type.avro")
> > protocol MyProtocol {
> >   enum MyEnumType {
> >     TypeVal1, TypeVal2
> >   }
> >   record MyEntry {
> >     MyEnumType type;
> >   }
> >   record MyRecord {
> >     array<MyEntry> entries;
> >   }
> > }
> 
> 
> To read from the topic, I've defined the following DDL:
> 
> > CREATE TABLE my_table
> 
> (
> >     `entries` ARRAY<ROW<
> >         *`type` ??? (This is the main question)*
> >     >>
> > ) WITH (
> >     'connector' = 'kafka',
> >     'topic' = 'my-topic',
> >     'properties.bootstrap.servers' = '...:9092',
> >     'scan.startup.mode' = 'latest-offset',
> >     'value.format' = 'avro-confluent',
> >     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
> >
> )
> 
> 
> And I run the following query :
> 
> > SELECT * FROM my_table
> 
> 
> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
> the type:
> 
> > *Caused by: java.io.IOException: Failed to deserialize Avro record.*
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> >   at
> > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> >   at
> > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> >   at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> >   at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> >   at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> >   at
> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> >   at
> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> >   at
> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> > *Caused by: org.apache.avro.AvroTypeException: Found
> > my.type.avro.MyEnumType, expecting union*
> >   at
> > org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> >   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
> >   at
> > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> >   at
> > org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> >   ... 9 more
> 
> The reason I use the STRING type is just for fast-prototyping.
> 
> While reading through [1], I've been thinking about using *RAW('class',
> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
> whether it is a good idea and if so, what can be a value for the snapshot.
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
> 
> Thanks in advance,
> 
> Dongwon
> 

Re: Fwd: How to deserialize Avro enum type in Flink SQL?

Posted by Timo Walther <tw...@apache.org>.
Hi Peter,

don't get confused by the year 2017 in the ticket. We had better Avro 
support in the meantime but this was based on the old type system around 
TypeInformation. Now we need to build up this support again for the new 
type system. I just found this ticket and found that the title fits. But 
we are planning to have better support soon either 1.15 or 1.16 latest.

Regards,
Timo

On 20.10.21 18:43, Peter Schrott wrote:
> Hi Timo,
> 
> sorry for being the party-pooper here! :O
> 
> My problem with the UDF is, that the SQL select will be passen from 
> outside and the outside world does not know about the UDF.
> 
> For the UTF8, I know that feature, unfortunately the schema is already 
> up and running and can't be touched that easily. But this would actually 
> be covered by your UDF suggestion.
> 
> Thanks for the update about the the open ticket, its open since 2017 – 
> seems not to be fixed in near future. :)
> 
> Best, Peter
> 
> On Wed, Oct 20, 2021 at 5:55 PM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Peter,
> 
>     as a temporary workaround I would simply implement a UDF like:
> 
>     public class EverythingToString extends ScalarFunction {
> 
>          public String eval(@DataTypeHint(inputGroup = ANY) Object o) {
>            return o.toString();
>          }
>     }
> 
>     For the Utf8 issue, you can instruct Avro to generate Java classes with
>     String instead using the `avro.java.string` option.
> 
>     The rework of the type system messed up the Avro support in Flink. This
>     is a known issue that is tracked under
> 
>     https://issues.apache.org/jira/browse/FLINK-8183
>     <https://issues.apache.org/jira/browse/FLINK-8183>
> 
>     Regards,
>     Timo
> 
>     On 20.10.21 17:30, Peter Schrott wrote:
>      > Hi Timo,
>      >
>      > thanks a lot for your suggestion.
>      >
>      > I also considered this workaround but when going from DataStreams
>     API to
>      > Table API (using the POJO generated by maven avro plugin) types
>     are not
>      > mapped correctly, esp. UTF8 (avros implementation of CharSquence)
>     and
>      > also enums. In the table I have then mostly RAW types, which are not
>      > handy to perform SQL statements on. It is already discussed here:
>      > https://www.mail-archive.com/user@flink.apache.org/msg44449.html
>     <https://www.mail-archive.com/user@flink.apache.org/msg44449.html>
>      > <https://www.mail-archive.com/user@flink.apache.org/msg44449.html
>     <https://www.mail-archive.com/user@flink.apache.org/msg44449.html>>
>      >
>      > Best, Peter
>      >
>      > On Wed, Oct 20, 2021 at 5:21 PM Timo Walther <twalthr@apache.org
>     <ma...@apache.org>
>      > <mailto:twalthr@apache.org <ma...@apache.org>>> wrote:
>      >
>      >     A current workaround is to use DataStream API to read the
>     data and
>      >     provide your custom Avro schema to configure the format. Then
>     switch to
>      >     Table API.
>      >
>      >     StreamTableEnvironment.fromDataStream(...) accepts all data
>     types. Enum
>      >     classes will be represented as RAW types but you can forward
>     them as
>      >     blackboxes or convert them in a UDF.
>      >
>      >     We will further improve the support of external types in the
>     Table API
>      >     type system in the near future.
>      >
>      >     Regards,
>      >     Timo
>      >
>      >     On 20.10.21 15:51, Peter Schrott wrote:
>      >      > Hi people!
>      >      >
>      >      > I was digging deeper this days and found the "root cause"
>     of the
>      >     issue and the difference between avro reading from files and avro
>      >     reading from Kafka & SR.
>      >      >
>      >      > plz see:
>      >
>     https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E
>     <https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E>
>      >   
>       <https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E <https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E>>
>      >      >
>      >      > The main problem with Kafka & SR is, that the
>      >     "org.apache.avro.generic.GenericDatumReader" is initialized
>     with and
>      >     "expected" schema which is taken from the flinks sql table
>      >     definition. When it comes to deserializing the and attribute with
>      >     type "enum" it does not match with the expected schema where this
>      >     same attribute is typed as "string". Hence avro deserializer
>     breaks
>      >     here.
>      >      >
>      >      > Not sure how to tackle that issue. The functioning of the
>      >     "GeneraticDatumReader" can not really be changed. A solution
>     could
>      >     be to create an analogues reader for reading data based on
>     SQL ddl.
>      >      >
>      >      > Cheers, Peter
>      >      >
>      >      > On 2021/10/12 16:18:30 Dongwon Kim wrote:
>      >      >> Hi community,
>      >      >>
>      >      >> Can I get advice on this question?
>      >      >>
>      >      >> Another user just sent me an email asking whether I found a
>      >     solution or a
>      >      >> workaround for this question, but I'm still stuck there.
>      >      >>
>      >      >> Any suggestions?
>      >      >>
>      >      >> Thanks in advance,
>      >      >>
>      >      >> Dongwon
>      >      >>
>      >      >> ---------- Forwarded message ---------
>      >      >> From: Dongwon Kim <eastcirclek@gmail.com
>     <ma...@gmail.com>
>      >     <mailto:eastcirclek@gmail.com <ma...@gmail.com>>>
>      >      >> Date: Mon, Aug 9, 2021 at 7:26 PM
>      >      >> Subject: How to deserialize Avro enum type in Flink SQL?
>      >      >> To: user <user@flink.apache.org
>     <ma...@flink.apache.org> <mailto:user@flink.apache.org
>     <ma...@flink.apache.org>>>
>      >      >>
>      >      >>
>      >      >> Hi community,
>      >      >>
>      >      >> I have a Kafka topic where the schema of its values is
>     defined
>      >     by the
>      >      >> "MyRecord" record in the following Avro IDL and registered to
>      >     the Confluent
>      >      >> Schema Registry.
>      >      >>
>      >      >>> @namespace("my.type.avro")
>      >      >>> protocol MyProtocol {
>      >      >>>    enum MyEnumType {
>      >      >>>      TypeVal1, TypeVal2
>      >      >>>    }
>      >      >>>    record MyEntry {
>      >      >>>      MyEnumType type;
>      >      >>>    }
>      >      >>>    record MyRecord {
>      >      >>>      array<MyEntry> entries;
>      >      >>>    }
>      >      >>> }
>      >      >>
>      >      >>
>      >      >> To read from the topic, I've defined the following DDL:
>      >      >>
>      >      >>> CREATE TABLE my_table
>      >      >>
>      >      >> (
>      >      >>>      `entries` ARRAY<ROW<
>      >      >>>          *`type` ??? (This is the main question)*
>      >      >>>      >>
>      >      >>> ) WITH (
>      >      >>>      'connector' = 'kafka',
>      >      >>>      'topic' = 'my-topic',
>      >      >>>      'properties.bootstrap.servers' = '...:9092',
>      >      >>>      'scan.startup.mode' = 'latest-offset',
>      >      >>>      'value.format' = 'avro-confluent',
>      >      >>>      'value.avro-confluent.schema-registry.url' =
>     'http://...:8081'
>      >      >>>
>      >      >> )
>      >      >>
>      >      >>
>      >      >> And I run the following query :
>      >      >>
>      >      >>> SELECT * FROM my_table
>      >      >>
>      >      >>
>      >      >> Now I got the following messages in Flink-1.13.1 when I use
>      >     *STRING* for
>      >      >> the type:
>      >      >>
>      >      >>> *Caused by: java.io.IOException: Failed to deserialize Avro
>      >     record.*
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>      >      >>> *Caused by: org.apache.avro.AvroTypeException: Found
>      >      >>> my.type.avro.MyEnumType, expecting union*
>      >      >>>    at
>      >      >>> org.apache.avro.io <http://org.apache.avro.io>
>      >     <http://org.apache.avro.io
>     <http://org.apache.avro.io>>.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>      >      >>>    at
>     org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>      >      >>>    at
>      >      >>> org.apache.avro.io <http://org.apache.avro.io>
>      >     <http://org.apache.avro.io
>     <http://org.apache.avro.io>>.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>      >      >>>    at
>      >      >>>
>      >   
>       org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>      >      >>>    ... 9 more
>      >      >>
>      >      >> The reason I use the STRING type is just for
>     fast-prototyping.
>      >      >>
>      >      >> While reading through [1], I've been thinking about using
>      >     *RAW('class',
>      >      >> 'snapshot')* where 'class' is my.type.avro.MyEnumType,
>     but I'm
>      >     not sure
>      >      >> whether it is a good idea and if so, what can be a value
>     for the
>      >     snapshot.
>      >      >>
>      >      >> [1]
>      >      >>
>      >
>     https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw>
>      >   
>       <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw>>
>      >      >>
>      >      >> Thanks in advance,
>      >      >>
>      >      >> Dongwon
>      >      >>
>      >      >
>      >
> 


Re: Fwd: How to deserialize Avro enum type in Flink SQL?

Posted by Timo Walther <tw...@apache.org>.
Hi Peter,

as a temporary workaround I would simply implement a UDF like:

public class EverythingToString extends ScalarFunction {

    public String eval(@DataTypeHint(inputGroup = ANY) Object o) {
      return o.toString();
    }
}

For the Utf8 issue, you can instruct Avro to generate Java classes with 
String instead using the `avro.java.string` option.

The rework of the type system messed up the Avro support in Flink. This 
is a known issue that is tracked under

https://issues.apache.org/jira/browse/FLINK-8183

Regards,
Timo

On 20.10.21 17:30, Peter Schrott wrote:
> Hi Timo,
> 
> thanks a lot for your suggestion.
> 
> I also considered this workaround but when going from DataStreams API to 
> Table API (using the POJO generated by maven avro plugin) types are not 
> mapped correctly, esp. UTF8 (avros implementation of CharSquence) and 
> also enums. In the table I have then mostly RAW types, which are not 
> handy to perform SQL statements on. It is already discussed here: 
> https://www.mail-archive.com/user@flink.apache.org/msg44449.html 
> <https://www.mail-archive.com/user@flink.apache.org/msg44449.html>
> 
> Best, Peter
> 
> On Wed, Oct 20, 2021 at 5:21 PM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     A current workaround is to use DataStream API to read the data and
>     provide your custom Avro schema to configure the format. Then switch to
>     Table API.
> 
>     StreamTableEnvironment.fromDataStream(...) accepts all data types. Enum
>     classes will be represented as RAW types but you can forward them as
>     blackboxes or convert them in a UDF.
> 
>     We will further improve the support of external types in the Table API
>     type system in the near future.
> 
>     Regards,
>     Timo
> 
>     On 20.10.21 15:51, Peter Schrott wrote:
>      > Hi people!
>      >
>      > I was digging deeper this days and found the "root cause" of the
>     issue and the difference between avro reading from files and avro
>     reading from Kafka & SR.
>      >
>      > plz see:
>     https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E
>     <https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E>
>      >
>      > The main problem with Kafka & SR is, that the
>     "org.apache.avro.generic.GenericDatumReader" is initialized with and
>     "expected" schema which is taken from the flinks sql table
>     definition. When it comes to deserializing the and attribute with
>     type "enum" it does not match with the expected schema where this
>     same attribute is typed as "string". Hence avro deserializer breaks
>     here.
>      >
>      > Not sure how to tackle that issue. The functioning of the
>     "GeneraticDatumReader" can not really be changed. A solution could
>     be to create an analogues reader for reading data based on SQL ddl.
>      >
>      > Cheers, Peter
>      >
>      > On 2021/10/12 16:18:30 Dongwon Kim wrote:
>      >> Hi community,
>      >>
>      >> Can I get advice on this question?
>      >>
>      >> Another user just sent me an email asking whether I found a
>     solution or a
>      >> workaround for this question, but I'm still stuck there.
>      >>
>      >> Any suggestions?
>      >>
>      >> Thanks in advance,
>      >>
>      >> Dongwon
>      >>
>      >> ---------- Forwarded message ---------
>      >> From: Dongwon Kim <eastcirclek@gmail.com
>     <ma...@gmail.com>>
>      >> Date: Mon, Aug 9, 2021 at 7:26 PM
>      >> Subject: How to deserialize Avro enum type in Flink SQL?
>      >> To: user <user@flink.apache.org <ma...@flink.apache.org>>
>      >>
>      >>
>      >> Hi community,
>      >>
>      >> I have a Kafka topic where the schema of its values is defined
>     by the
>      >> "MyRecord" record in the following Avro IDL and registered to
>     the Confluent
>      >> Schema Registry.
>      >>
>      >>> @namespace("my.type.avro")
>      >>> protocol MyProtocol {
>      >>>    enum MyEnumType {
>      >>>      TypeVal1, TypeVal2
>      >>>    }
>      >>>    record MyEntry {
>      >>>      MyEnumType type;
>      >>>    }
>      >>>    record MyRecord {
>      >>>      array<MyEntry> entries;
>      >>>    }
>      >>> }
>      >>
>      >>
>      >> To read from the topic, I've defined the following DDL:
>      >>
>      >>> CREATE TABLE my_table
>      >>
>      >> (
>      >>>      `entries` ARRAY<ROW<
>      >>>          *`type` ??? (This is the main question)*
>      >>>      >>
>      >>> ) WITH (
>      >>>      'connector' = 'kafka',
>      >>>      'topic' = 'my-topic',
>      >>>      'properties.bootstrap.servers' = '...:9092',
>      >>>      'scan.startup.mode' = 'latest-offset',
>      >>>      'value.format' = 'avro-confluent',
>      >>>      'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>      >>>
>      >> )
>      >>
>      >>
>      >> And I run the following query :
>      >>
>      >>> SELECT * FROM my_table
>      >>
>      >>
>      >> Now I got the following messages in Flink-1.13.1 when I use
>     *STRING* for
>      >> the type:
>      >>
>      >>> *Caused by: java.io.IOException: Failed to deserialize Avro
>     record.*
>      >>>    at
>      >>>
>     org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>      >>>    at
>      >>>
>     org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>      >>>    at
>      >>>
>     org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>      >>>    at
>      >>>
>     org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>      >>>    at
>      >>>
>     org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>      >>>    at
>      >>>
>     org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>      >>>    at
>      >>>
>     org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>      >>>    at
>      >>>
>     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>      >>>    at
>      >>>
>     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>      >>>    at
>      >>>
>     org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>      >>> *Caused by: org.apache.avro.AvroTypeException: Found
>      >>> my.type.avro.MyEnumType, expecting union*
>      >>>    at
>      >>> org.apache.avro.io
>     <http://org.apache.avro.io>.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>      >>>    at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>      >>>    at
>      >>> org.apache.avro.io
>     <http://org.apache.avro.io>.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>      >>>    at
>      >>>
>     org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>      >>>    at
>      >>>
>     org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>      >>>    at
>      >>>
>     org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>      >>>    ... 9 more
>      >>
>      >> The reason I use the STRING type is just for fast-prototyping.
>      >>
>      >> While reading through [1], I've been thinking about using
>     *RAW('class',
>      >> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm
>     not sure
>      >> whether it is a good idea and if so, what can be a value for the
>     snapshot.
>      >>
>      >> [1]
>      >>
>     https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw>
>      >>
>      >> Thanks in advance,
>      >>
>      >> Dongwon
>      >>
>      >
> 


Re: Fwd: How to deserialize Avro enum type in Flink SQL?

Posted by Peter Schrott <pe...@googlemail.com>.
Hi Timo,

thanks a lot for your suggestion.

I also considered this workaround but when going from DataStreams API to
Table API (using the POJO generated by maven avro plugin) types are not
mapped correctly, esp. UTF8 (avros implementation of CharSquence) and also
enums. In the table I have then mostly RAW types, which are not handy to
perform SQL statements on. It is already discussed here:
https://www.mail-archive.com/user@flink.apache.org/msg44449.html

Best, Peter

On Wed, Oct 20, 2021 at 5:21 PM Timo Walther <tw...@apache.org> wrote:

> A current workaround is to use DataStream API to read the data and
> provide your custom Avro schema to configure the format. Then switch to
> Table API.
>
> StreamTableEnvironment.fromDataStream(...) accepts all data types. Enum
> classes will be represented as RAW types but you can forward them as
> blackboxes or convert them in a UDF.
>
> We will further improve the support of external types in the Table API
> type system in the near future.
>
> Regards,
> Timo
>
> On 20.10.21 15:51, Peter Schrott wrote:
> > Hi people!
> >
> > I was digging deeper this days and found the "root cause" of the issue
> and the difference between avro reading from files and avro reading from
> Kafka & SR.
> >
> > plz see:
> https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E
> >
> > The main problem with Kafka & SR is, that the
> "org.apache.avro.generic.GenericDatumReader" is initialized with and
> "expected" schema which is taken from the flinks sql table definition. When
> it comes to deserializing the and attribute with type "enum" it does not
> match with the expected schema where this same attribute is typed as
> "string". Hence avro deserializer breaks here.
> >
> > Not sure how to tackle that issue. The functioning of the
> "GeneraticDatumReader" can not really be changed. A solution could be to
> create an analogues reader for reading data based on SQL ddl.
> >
> > Cheers, Peter
> >
> > On 2021/10/12 16:18:30 Dongwon Kim wrote:
> >> Hi community,
> >>
> >> Can I get advice on this question?
> >>
> >> Another user just sent me an email asking whether I found a solution or
> a
> >> workaround for this question, but I'm still stuck there.
> >>
> >> Any suggestions?
> >>
> >> Thanks in advance,
> >>
> >> Dongwon
> >>
> >> ---------- Forwarded message ---------
> >> From: Dongwon Kim <ea...@gmail.com>
> >> Date: Mon, Aug 9, 2021 at 7:26 PM
> >> Subject: How to deserialize Avro enum type in Flink SQL?
> >> To: user <us...@flink.apache.org>
> >>
> >>
> >> Hi community,
> >>
> >> I have a Kafka topic where the schema of its values is defined by the
> >> "MyRecord" record in the following Avro IDL and registered to the
> Confluent
> >> Schema Registry.
> >>
> >>> @namespace("my.type.avro")
> >>> protocol MyProtocol {
> >>>    enum MyEnumType {
> >>>      TypeVal1, TypeVal2
> >>>    }
> >>>    record MyEntry {
> >>>      MyEnumType type;
> >>>    }
> >>>    record MyRecord {
> >>>      array<MyEntry> entries;
> >>>    }
> >>> }
> >>
> >>
> >> To read from the topic, I've defined the following DDL:
> >>
> >>> CREATE TABLE my_table
> >>
> >> (
> >>>      `entries` ARRAY<ROW<
> >>>          *`type` ??? (This is the main question)*
> >>>      >>
> >>> ) WITH (
> >>>      'connector' = 'kafka',
> >>>      'topic' = 'my-topic',
> >>>      'properties.bootstrap.servers' = '...:9092',
> >>>      'scan.startup.mode' = 'latest-offset',
> >>>      'value.format' = 'avro-confluent',
> >>>      'value.avro-confluent.schema-registry.url' = 'http://...:8081'
> >>>
> >> )
> >>
> >>
> >> And I run the following query :
> >>
> >>> SELECT * FROM my_table
> >>
> >>
> >> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
> >> the type:
> >>
> >>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
> >>>    at
> >>>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> >>>    at
> >>>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> >>>    at
> >>>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> >>>    at
> >>>
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> >>>    at
> >>>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> >>>    at
> >>>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> >>>    at
> >>>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> >>>    at
> >>>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> >>>    at
> >>>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> >>>    at
> >>>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> >>> *Caused by: org.apache.avro.AvroTypeException: Found
> >>> my.type.avro.MyEnumType, expecting union*
> >>>    at
> >>> org.apache.avro.io
> .ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> >>>    at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
> >>>    at
> >>> org.apache.avro.io
> .ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >>>    at
> >>>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> >>>    at
> >>>
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
> >>>    at
> >>>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> >>>    ... 9 more
> >>
> >> The reason I use the STRING type is just for fast-prototyping.
> >>
> >> While reading through [1], I've been thinking about using *RAW('class',
> >> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
> >> whether it is a good idea and if so, what can be a value for the
> snapshot.
> >>
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
> >>
> >> Thanks in advance,
> >>
> >> Dongwon
> >>
> >
>
>

Re: Fwd: How to deserialize Avro enum type in Flink SQL?

Posted by Timo Walther <tw...@apache.org>.
A current workaround is to use DataStream API to read the data and 
provide your custom Avro schema to configure the format. Then switch to 
Table API.

StreamTableEnvironment.fromDataStream(...) accepts all data types. Enum 
classes will be represented as RAW types but you can forward them as 
blackboxes or convert them in a UDF.

We will further improve the support of external types in the Table API 
type system in the near future.

Regards,
Timo

On 20.10.21 15:51, Peter Schrott wrote:
> Hi people!
> 
> I was digging deeper this days and found the "root cause" of the issue and the difference between avro reading from files and avro reading from Kafka & SR.
> 
> plz see: https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E
> 
> The main problem with Kafka & SR is, that the "org.apache.avro.generic.GenericDatumReader" is initialized with and "expected" schema which is taken from the flinks sql table definition. When it comes to deserializing the and attribute with type "enum" it does not match with the expected schema where this same attribute is typed as "string". Hence avro deserializer breaks here.
> 
> Not sure how to tackle that issue. The functioning of the "GeneraticDatumReader" can not really be changed. A solution could be to create an analogues reader for reading data based on SQL ddl.
> 
> Cheers, Peter
> 
> On 2021/10/12 16:18:30 Dongwon Kim wrote:
>> Hi community,
>>
>> Can I get advice on this question?
>>
>> Another user just sent me an email asking whether I found a solution or a
>> workaround for this question, but I'm still stuck there.
>>
>> Any suggestions?
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>> ---------- Forwarded message ---------
>> From: Dongwon Kim <ea...@gmail.com>
>> Date: Mon, Aug 9, 2021 at 7:26 PM
>> Subject: How to deserialize Avro enum type in Flink SQL?
>> To: user <us...@flink.apache.org>
>>
>>
>> Hi community,
>>
>> I have a Kafka topic where the schema of its values is defined by the
>> "MyRecord" record in the following Avro IDL and registered to the Confluent
>> Schema Registry.
>>
>>> @namespace("my.type.avro")
>>> protocol MyProtocol {
>>>    enum MyEnumType {
>>>      TypeVal1, TypeVal2
>>>    }
>>>    record MyEntry {
>>>      MyEnumType type;
>>>    }
>>>    record MyRecord {
>>>      array<MyEntry> entries;
>>>    }
>>> }
>>
>>
>> To read from the topic, I've defined the following DDL:
>>
>>> CREATE TABLE my_table
>>
>> (
>>>      `entries` ARRAY<ROW<
>>>          *`type` ??? (This is the main question)*
>>>      >>
>>> ) WITH (
>>>      'connector' = 'kafka',
>>>      'topic' = 'my-topic',
>>>      'properties.bootstrap.servers' = '...:9092',
>>>      'scan.startup.mode' = 'latest-offset',
>>>      'value.format' = 'avro-confluent',
>>>      'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>>
>> )
>>
>>
>> And I run the following query :
>>
>>> SELECT * FROM my_table
>>
>>
>> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
>> the type:
>>
>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>>>    at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>>>    at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>>>    at
>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>>    at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>>    at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>>    at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>>> *Caused by: org.apache.avro.AvroTypeException: Found
>>> my.type.avro.MyEnumType, expecting union*
>>>    at
>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>>    at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>>    at
>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>    at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>    at
>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>>>    at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>>>    ... 9 more
>>
>> The reason I use the STRING type is just for fast-prototyping.
>>
>> While reading through [1], I've been thinking about using *RAW('class',
>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
>> whether it is a good idea and if so, what can be a value for the snapshot.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
>>
>> Thanks in advance,
>>
>> Dongwon
>>
> 


Re: Fwd: How to deserialize Avro enum type in Flink SQL?

Posted by Peter Schrott <pe...@googlemail.com>.
Hi people!

I was digging deeper this days and found the "root cause" of the issue and the difference between avro reading from files and avro reading from Kafka & SR.

plz see: https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E

The main problem with Kafka & SR is, that the "org.apache.avro.generic.GenericDatumReader" is initialized with and "expected" schema which is taken from the flinks sql table definition. When it comes to deserializing the and attribute with type "enum" it does not match with the expected schema where this same attribute is typed as "string". Hence avro deserializer breaks here.

Not sure how to tackle that issue. The functioning of the "GeneraticDatumReader" can not really be changed. A solution could be to create an analogues reader for reading data based on SQL ddl.

Cheers, Peter

On 2021/10/12 16:18:30 Dongwon Kim wrote:
> Hi community,
> 
> Can I get advice on this question?
> 
> Another user just sent me an email asking whether I found a solution or a
> workaround for this question, but I'm still stuck there.
> 
> Any suggestions?
> 
> Thanks in advance,
> 
> Dongwon
> 
> ---------- Forwarded message ---------
> From: Dongwon Kim <ea...@gmail.com>
> Date: Mon, Aug 9, 2021 at 7:26 PM
> Subject: How to deserialize Avro enum type in Flink SQL?
> To: user <us...@flink.apache.org>
> 
> 
> Hi community,
> 
> I have a Kafka topic where the schema of its values is defined by the
> "MyRecord" record in the following Avro IDL and registered to the Confluent
> Schema Registry.
> 
> > @namespace("my.type.avro")
> > protocol MyProtocol {
> >   enum MyEnumType {
> >     TypeVal1, TypeVal2
> >   }
> >   record MyEntry {
> >     MyEnumType type;
> >   }
> >   record MyRecord {
> >     array<MyEntry> entries;
> >   }
> > }
> 
> 
> To read from the topic, I've defined the following DDL:
> 
> > CREATE TABLE my_table
> 
> (
> >     `entries` ARRAY<ROW<
> >         *`type` ??? (This is the main question)*
> >     >>
> > ) WITH (
> >     'connector' = 'kafka',
> >     'topic' = 'my-topic',
> >     'properties.bootstrap.servers' = '...:9092',
> >     'scan.startup.mode' = 'latest-offset',
> >     'value.format' = 'avro-confluent',
> >     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
> >
> )
> 
> 
> And I run the following query :
> 
> > SELECT * FROM my_table
> 
> 
> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
> the type:
> 
> > *Caused by: java.io.IOException: Failed to deserialize Avro record.*
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> >   at
> > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> >   at
> > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> >   at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> >   at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> >   at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> >   at
> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> >   at
> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> >   at
> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> > *Caused by: org.apache.avro.AvroTypeException: Found
> > my.type.avro.MyEnumType, expecting union*
> >   at
> > org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> >   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
> >   at
> > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> >   at
> > org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> >   ... 9 more
> 
> The reason I use the STRING type is just for fast-prototyping.
> 
> While reading through [1], I've been thinking about using *RAW('class',
> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
> whether it is a good idea and if so, what can be a value for the snapshot.
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
> 
> Thanks in advance,
> 
> Dongwon
> 

Re: Re: How to deserialize Avro enum type in Flink SQL?

Posted by Peter Schrott <pe...@googlemail.com>.
Hi & thanks,

with your solution you are referring the the reported exception: 
`Found my.type.avro.MyEnumType, expecting union`

I investigated on the "union" part and added "NOT NULL" to the SQL statement, such that the attribute is NOT nullable on avro AND SQL. This actually "fixed" the reported exception but the following exception was thrown:
`AvroTypeException: Found org.example.MyEnumType, expecting string`
So there is a problem working with enums on input from Kafka combined with confluent schema registry.

I also tried your suggestion, make the attribute nullable in avro and SQL by defining it actually as a union in avro (which is expected according to the exception). But the exception did not change. Please compare: https://issues.apache.org/jira/browse/FLINK-24544?focusedCommentId=17429243&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17429243

Thanks & Best
Peter


On 2021/10/18 08:39:41 Arvid Heise wrote:
> Just as an idea for a workaround as Flink apparently expects the enum field
> to be nullable.
> 
>   record MyEntry {
>     MyEnumType type; <- make that nullable
>   }
> 
> Of course that is only an option if you are able to change the producer.
> 
> On Thu, Oct 14, 2021 at 11:17 AM Francesco Guardiani <
> francesco@ververica.com> wrote:
> 
> > It reproduces on my machine, so I've opened a JIRA issue about that:
> > FLINK-24544 <https://issues.apache.org/jira/browse/FLINK-24544>.
> > Unfortunately, I don't have any ready to use workarounds for you.
> >
> > On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim <ea...@gmail.com> wrote:
> >
> >> Can you provide a minimal reproducer (without confluent schema registry)
> >>> with a valid input?
> >>>
> >>
> >> Please download and unzip the attached file.
> >>
> >>    - src/main/avro/MyProtocol.avdl
> >>       - MyRecord, MyEntry, and the MyEnumType is defined
> >>       - "mvn generate-sources" will auto-generate Java classes under
> >>       "target/generated-sources"
> >>    - "org.example.fs" contains
> >>       - "org.example.fs.Writer" which writes a single record of MyRecord
> >>       type to "output.avro"
> >>       - "org.example.fs.Reader" which reads the record from
> >>       "output.avro"
> >>       - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
> >>       defined in "my_table.ddl" and shows that it successfully deserialize
> >>       MyRecord from a Avro record written in a file as you mentioned.
> >>    - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
> >>    "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and
> >>    looks up the schema from Schema Registry
> >>       - However, it produces the same exception unlike
> >>       ExampleFromFileSystem
> >>       - What I produced to a Kafka topic is {"entries": [{"type":
> >>       "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a Avro
> >>       record saved on output.avro.
> >>       - The size of "output.avro" is 321 bytes on the disk while the
> >>       size of the value of a Kafka record is 10 bytes.
> >>
> >> Hope this provides enough information.
> >>
> >> Best,
> >>
> >> Dongwon
> >>
> >> On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <
> >> francesco@ververica.com> wrote:
> >>
> >>> First of all, are you sure the input data is correct? From the
> >>> stacktrace it seems to me the issue might be that the input data is invalid.
> >>>
> >>> Looking at the code of AvroToRowDataConverters, It sounds like STRING
> >>> should work with avro enums. Can you provide a minimal reproducer (without
> >>> confluent schema registry) with a valid input?
> >>>
> >>> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <ea...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi community,
> >>>>
> >>>> Can I get advice on this question?
> >>>>
> >>>> Another user just sent me an email asking whether I found a solution or
> >>>> a workaround for this question, but I'm still stuck there.
> >>>>
> >>>> Any suggestions?
> >>>>
> >>>> Thanks in advance,
> >>>>
> >>>> Dongwon
> >>>>
> >>>> ---------- Forwarded message ---------
> >>>> From: Dongwon Kim <ea...@gmail.com>
> >>>> Date: Mon, Aug 9, 2021 at 7:26 PM
> >>>> Subject: How to deserialize Avro enum type in Flink SQL?
> >>>> To: user <us...@flink.apache.org>
> >>>>
> >>>>
> >>>> Hi community,
> >>>>
> >>>> I have a Kafka topic where the schema of its values is defined by the
> >>>> "MyRecord" record in the following Avro IDL and registered to the Confluent
> >>>> Schema Registry.
> >>>>
> >>>>> @namespace("my.type.avro")
> >>>>> protocol MyProtocol {
> >>>>>   enum MyEnumType {
> >>>>>     TypeVal1, TypeVal2
> >>>>>   }
> >>>>>   record MyEntry {
> >>>>>     MyEnumType type;
> >>>>>   }
> >>>>>   record MyRecord {
> >>>>>     array<MyEntry> entries;
> >>>>>   }
> >>>>> }
> >>>>
> >>>>
> >>>> To read from the topic, I've defined the following DDL:
> >>>>
> >>>>> CREATE TABLE my_table
> >>>>
> >>>> (
> >>>>>     `entries` ARRAY<ROW<
> >>>>>         *`type` ??? (This is the main question)*
> >>>>>     >>
> >>>>> ) WITH (
> >>>>>     'connector' = 'kafka',
> >>>>>     'topic' = 'my-topic',
> >>>>>     'properties.bootstrap.servers' = '...:9092',
> >>>>>     'scan.startup.mode' = 'latest-offset',
> >>>>>     'value.format' = 'avro-confluent',
> >>>>>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
> >>>>>
> >>>> )
> >>>>
> >>>>
> >>>> And I run the following query :
> >>>>
> >>>>> SELECT * FROM my_table
> >>>>
> >>>>
> >>>> Now I got the following messages in Flink-1.13.1 when I use *STRING*
> >>>> for the type:
> >>>>
> >>>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
> >>>>>   at
> >>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> >>>>>   at
> >>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> >>>>>   at
> >>>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> >>>>>   at
> >>>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> >>>>>   at
> >>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> >>>>>   at
> >>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> >>>>>   at
> >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> >>>>>   at
> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> >>>>>   at
> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> >>>>>   at
> >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> >>>>> *Caused by: org.apache.avro.AvroTypeException: Found
> >>>>> my.type.avro.MyEnumType, expecting union*
> >>>>>   at
> >>>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> >>>>>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
> >>>>>   at
> >>>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> >>>>>   at
> >>>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
> >>>>>   at
> >>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> >>>>>   ... 9 more
> >>>>
> >>>> The reason I use the STRING type is just for fast-prototyping.
> >>>>
> >>>> While reading through [1], I've been thinking about using *RAW('class',
> >>>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not
> >>>> sure whether it is a good idea and if so, what can be a value for the
> >>>> snapshot.
> >>>>
> >>>> [1]
> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
> >>>>
> >>>> Thanks in advance,
> >>>>
> >>>> Dongwon
> >>>>
> >>>
> 

Re: How to deserialize Avro enum type in Flink SQL?

Posted by Arvid Heise <ar...@apache.org>.
Just as an idea for a workaround as Flink apparently expects the enum field
to be nullable.

  record MyEntry {
    MyEnumType type; <- make that nullable
  }

Of course that is only an option if you are able to change the producer.

On Thu, Oct 14, 2021 at 11:17 AM Francesco Guardiani <
francesco@ververica.com> wrote:

> It reproduces on my machine, so I've opened a JIRA issue about that:
> FLINK-24544 <https://issues.apache.org/jira/browse/FLINK-24544>.
> Unfortunately, I don't have any ready to use workarounds for you.
>
> On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Can you provide a minimal reproducer (without confluent schema registry)
>>> with a valid input?
>>>
>>
>> Please download and unzip the attached file.
>>
>>    - src/main/avro/MyProtocol.avdl
>>       - MyRecord, MyEntry, and the MyEnumType is defined
>>       - "mvn generate-sources" will auto-generate Java classes under
>>       "target/generated-sources"
>>    - "org.example.fs" contains
>>       - "org.example.fs.Writer" which writes a single record of MyRecord
>>       type to "output.avro"
>>       - "org.example.fs.Reader" which reads the record from
>>       "output.avro"
>>       - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
>>       defined in "my_table.ddl" and shows that it successfully deserialize
>>       MyRecord from a Avro record written in a file as you mentioned.
>>    - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
>>    "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and
>>    looks up the schema from Schema Registry
>>       - However, it produces the same exception unlike
>>       ExampleFromFileSystem
>>       - What I produced to a Kafka topic is {"entries": [{"type":
>>       "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a Avro
>>       record saved on output.avro.
>>       - The size of "output.avro" is 321 bytes on the disk while the
>>       size of the value of a Kafka record is 10 bytes.
>>
>> Hope this provides enough information.
>>
>> Best,
>>
>> Dongwon
>>
>> On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <
>> francesco@ververica.com> wrote:
>>
>>> First of all, are you sure the input data is correct? From the
>>> stacktrace it seems to me the issue might be that the input data is invalid.
>>>
>>> Looking at the code of AvroToRowDataConverters, It sounds like STRING
>>> should work with avro enums. Can you provide a minimal reproducer (without
>>> confluent schema registry) with a valid input?
>>>
>>> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Hi community,
>>>>
>>>> Can I get advice on this question?
>>>>
>>>> Another user just sent me an email asking whether I found a solution or
>>>> a workaround for this question, but I'm still stuck there.
>>>>
>>>> Any suggestions?
>>>>
>>>> Thanks in advance,
>>>>
>>>> Dongwon
>>>>
>>>> ---------- Forwarded message ---------
>>>> From: Dongwon Kim <ea...@gmail.com>
>>>> Date: Mon, Aug 9, 2021 at 7:26 PM
>>>> Subject: How to deserialize Avro enum type in Flink SQL?
>>>> To: user <us...@flink.apache.org>
>>>>
>>>>
>>>> Hi community,
>>>>
>>>> I have a Kafka topic where the schema of its values is defined by the
>>>> "MyRecord" record in the following Avro IDL and registered to the Confluent
>>>> Schema Registry.
>>>>
>>>>> @namespace("my.type.avro")
>>>>> protocol MyProtocol {
>>>>>   enum MyEnumType {
>>>>>     TypeVal1, TypeVal2
>>>>>   }
>>>>>   record MyEntry {
>>>>>     MyEnumType type;
>>>>>   }
>>>>>   record MyRecord {
>>>>>     array<MyEntry> entries;
>>>>>   }
>>>>> }
>>>>
>>>>
>>>> To read from the topic, I've defined the following DDL:
>>>>
>>>>> CREATE TABLE my_table
>>>>
>>>> (
>>>>>     `entries` ARRAY<ROW<
>>>>>         *`type` ??? (This is the main question)*
>>>>>     >>
>>>>> ) WITH (
>>>>>     'connector' = 'kafka',
>>>>>     'topic' = 'my-topic',
>>>>>     'properties.bootstrap.servers' = '...:9092',
>>>>>     'scan.startup.mode' = 'latest-offset',
>>>>>     'value.format' = 'avro-confluent',
>>>>>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>>>>
>>>> )
>>>>
>>>>
>>>> And I run the following query :
>>>>
>>>>> SELECT * FROM my_table
>>>>
>>>>
>>>> Now I got the following messages in Flink-1.13.1 when I use *STRING*
>>>> for the type:
>>>>
>>>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>>>>>   at
>>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>>>>>   at
>>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>>>>>   at
>>>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>>>>   at
>>>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>>>>   at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>>>>>   at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>>>>   at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>>>>   at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>>>>   at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>>>>   at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>>>>> *Caused by: org.apache.avro.AvroTypeException: Found
>>>>> my.type.avro.MyEnumType, expecting union*
>>>>>   at
>>>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>>>>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>>>>   at
>>>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>   at
>>>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>>>>>   at
>>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>>>>>   ... 9 more
>>>>
>>>> The reason I use the STRING type is just for fast-prototyping.
>>>>
>>>> While reading through [1], I've been thinking about using *RAW('class',
>>>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not
>>>> sure whether it is a good idea and if so, what can be a value for the
>>>> snapshot.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
>>>>
>>>> Thanks in advance,
>>>>
>>>> Dongwon
>>>>
>>>

Re: How to deserialize Avro enum type in Flink SQL?

Posted by Francesco Guardiani <fr...@ververica.com>.
It reproduces on my machine, so I've opened a JIRA issue about that:
FLINK-24544 <https://issues.apache.org/jira/browse/FLINK-24544>.
Unfortunately, I don't have any ready to use workarounds for you.

On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim <ea...@gmail.com> wrote:

> Can you provide a minimal reproducer (without confluent schema registry)
>> with a valid input?
>>
>
> Please download and unzip the attached file.
>
>    - src/main/avro/MyProtocol.avdl
>       - MyRecord, MyEntry, and the MyEnumType is defined
>       - "mvn generate-sources" will auto-generate Java classes under
>       "target/generated-sources"
>    - "org.example.fs" contains
>       - "org.example.fs.Writer" which writes a single record of MyRecord
>       type to "output.avro"
>       - "org.example.fs.Reader" which reads the record from "output.avro"
>       - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
>       defined in "my_table.ddl" and shows that it successfully deserialize
>       MyRecord from a Avro record written in a file as you mentioned.
>    - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
>    "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and
>    looks up the schema from Schema Registry
>       - However, it produces the same exception unlike
>       ExampleFromFileSystem
>       - What I produced to a Kafka topic is {"entries": [{"type":
>       "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a Avro
>       record saved on output.avro.
>       - The size of "output.avro" is 321 bytes on the disk while the size
>       of the value of a Kafka record is 10 bytes.
>
> Hope this provides enough information.
>
> Best,
>
> Dongwon
>
> On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <
> francesco@ververica.com> wrote:
>
>> First of all, are you sure the input data is correct? From the stacktrace
>> it seems to me the issue might be that the input data is invalid.
>>
>> Looking at the code of AvroToRowDataConverters, It sounds like STRING
>> should work with avro enums. Can you provide a minimal reproducer (without
>> confluent schema registry) with a valid input?
>>
>> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hi community,
>>>
>>> Can I get advice on this question?
>>>
>>> Another user just sent me an email asking whether I found a solution or
>>> a workaround for this question, but I'm still stuck there.
>>>
>>> Any suggestions?
>>>
>>> Thanks in advance,
>>>
>>> Dongwon
>>>
>>> ---------- Forwarded message ---------
>>> From: Dongwon Kim <ea...@gmail.com>
>>> Date: Mon, Aug 9, 2021 at 7:26 PM
>>> Subject: How to deserialize Avro enum type in Flink SQL?
>>> To: user <us...@flink.apache.org>
>>>
>>>
>>> Hi community,
>>>
>>> I have a Kafka topic where the schema of its values is defined by the
>>> "MyRecord" record in the following Avro IDL and registered to the Confluent
>>> Schema Registry.
>>>
>>>> @namespace("my.type.avro")
>>>> protocol MyProtocol {
>>>>   enum MyEnumType {
>>>>     TypeVal1, TypeVal2
>>>>   }
>>>>   record MyEntry {
>>>>     MyEnumType type;
>>>>   }
>>>>   record MyRecord {
>>>>     array<MyEntry> entries;
>>>>   }
>>>> }
>>>
>>>
>>> To read from the topic, I've defined the following DDL:
>>>
>>>> CREATE TABLE my_table
>>>
>>> (
>>>>     `entries` ARRAY<ROW<
>>>>         *`type` ??? (This is the main question)*
>>>>     >>
>>>> ) WITH (
>>>>     'connector' = 'kafka',
>>>>     'topic' = 'my-topic',
>>>>     'properties.bootstrap.servers' = '...:9092',
>>>>     'scan.startup.mode' = 'latest-offset',
>>>>     'value.format' = 'avro-confluent',
>>>>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>>>
>>> )
>>>
>>>
>>> And I run the following query :
>>>
>>>> SELECT * FROM my_table
>>>
>>>
>>> Now I got the following messages in Flink-1.13.1 when I use *STRING*
>>> for the type:
>>>
>>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>>>>   at
>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>>>>   at
>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>>>>   at
>>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>>>   at
>>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>>>   at
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>>>>   at
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>>>   at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>>>   at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>>>   at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>>>   at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>>>> *Caused by: org.apache.avro.AvroTypeException: Found
>>>> my.type.avro.MyEnumType, expecting union*
>>>>   at
>>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>>>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>>>   at
>>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>   at
>>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>>>>   at
>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>>>>   ... 9 more
>>>
>>> The reason I use the STRING type is just for fast-prototyping.
>>>
>>> While reading through [1], I've been thinking about using *RAW('class',
>>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
>>> whether it is a good idea and if so, what can be a value for the snapshot.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
>>>
>>> Thanks in advance,
>>>
>>> Dongwon
>>>
>>

Re: How to deserialize Avro enum type in Flink SQL?

Posted by Dongwon Kim <ea...@gmail.com>.
>
> Can you provide a minimal reproducer (without confluent schema registry)
> with a valid input?
>

Please download and unzip the attached file.

   - src/main/avro/MyProtocol.avdl
      - MyRecord, MyEntry, and the MyEnumType is defined
      - "mvn generate-sources" will auto-generate Java classes under
      "target/generated-sources"
   - "org.example.fs" contains
      - "org.example.fs.Writer" which writes a single record of MyRecord
      type to "output.avro"
      - "org.example.fs.Reader" which reads the record from "output.avro"
      - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
      defined in "my_table.ddl" and shows that it successfully deserialize
      MyRecord from a Avro record written in a file as you mentioned.
   - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
   "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and
   looks up the schema from Schema Registry
      - However, it produces the same exception unlike
      ExampleFromFileSystem
      - What I produced to a Kafka topic is {"entries": [{"type":
      "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a Avro
      record saved on output.avro.
      - The size of "output.avro" is 321 bytes on the disk while the size
      of the value of a Kafka record is 10 bytes.

Hope this provides enough information.

Best,

Dongwon

On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <fr...@ververica.com>
wrote:

> First of all, are you sure the input data is correct? From the stacktrace
> it seems to me the issue might be that the input data is invalid.
>
> Looking at the code of AvroToRowDataConverters, It sounds like STRING
> should work with avro enums. Can you provide a minimal reproducer (without
> confluent schema registry) with a valid input?
>
> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi community,
>>
>> Can I get advice on this question?
>>
>> Another user just sent me an email asking whether I found a solution or a
>> workaround for this question, but I'm still stuck there.
>>
>> Any suggestions?
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>> ---------- Forwarded message ---------
>> From: Dongwon Kim <ea...@gmail.com>
>> Date: Mon, Aug 9, 2021 at 7:26 PM
>> Subject: How to deserialize Avro enum type in Flink SQL?
>> To: user <us...@flink.apache.org>
>>
>>
>> Hi community,
>>
>> I have a Kafka topic where the schema of its values is defined by the
>> "MyRecord" record in the following Avro IDL and registered to the Confluent
>> Schema Registry.
>>
>>> @namespace("my.type.avro")
>>> protocol MyProtocol {
>>>   enum MyEnumType {
>>>     TypeVal1, TypeVal2
>>>   }
>>>   record MyEntry {
>>>     MyEnumType type;
>>>   }
>>>   record MyRecord {
>>>     array<MyEntry> entries;
>>>   }
>>> }
>>
>>
>> To read from the topic, I've defined the following DDL:
>>
>>> CREATE TABLE my_table
>>
>> (
>>>     `entries` ARRAY<ROW<
>>>         *`type` ??? (This is the main question)*
>>>     >>
>>> ) WITH (
>>>     'connector' = 'kafka',
>>>     'topic' = 'my-topic',
>>>     'properties.bootstrap.servers' = '...:9092',
>>>     'scan.startup.mode' = 'latest-offset',
>>>     'value.format' = 'avro-confluent',
>>>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>>
>> )
>>
>>
>> And I run the following query :
>>
>>> SELECT * FROM my_table
>>
>>
>> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
>> the type:
>>
>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>>>   at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>>>   at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>>>   at
>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>>   at
>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>>   at
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>>>   at
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>>   at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>>   at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>>   at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>>> *Caused by: org.apache.avro.AvroTypeException: Found
>>> my.type.avro.MyEnumType, expecting union*
>>>   at
>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>>   at
>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>   at
>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>>>   at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>>>   ... 9 more
>>
>> The reason I use the STRING type is just for fast-prototyping.
>>
>> While reading through [1], I've been thinking about using *RAW('class',
>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
>> whether it is a good idea and if so, what can be a value for the snapshot.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>

Re: How to deserialize Avro enum type in Flink SQL?

Posted by Francesco Guardiani <fr...@ververica.com>.
First of all, are you sure the input data is correct? From the stacktrace
it seems to me the issue might be that the input data is invalid.

Looking at the code of AvroToRowDataConverters, It sounds like STRING
should work with avro enums. Can you provide a minimal reproducer (without
confluent schema registry) with a valid input?

On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <ea...@gmail.com> wrote:

> Hi community,
>
> Can I get advice on this question?
>
> Another user just sent me an email asking whether I found a solution or a
> workaround for this question, but I'm still stuck there.
>
> Any suggestions?
>
> Thanks in advance,
>
> Dongwon
>
> ---------- Forwarded message ---------
> From: Dongwon Kim <ea...@gmail.com>
> Date: Mon, Aug 9, 2021 at 7:26 PM
> Subject: How to deserialize Avro enum type in Flink SQL?
> To: user <us...@flink.apache.org>
>
>
> Hi community,
>
> I have a Kafka topic where the schema of its values is defined by the
> "MyRecord" record in the following Avro IDL and registered to the Confluent
> Schema Registry.
>
>> @namespace("my.type.avro")
>> protocol MyProtocol {
>>   enum MyEnumType {
>>     TypeVal1, TypeVal2
>>   }
>>   record MyEntry {
>>     MyEnumType type;
>>   }
>>   record MyRecord {
>>     array<MyEntry> entries;
>>   }
>> }
>
>
> To read from the topic, I've defined the following DDL:
>
>> CREATE TABLE my_table
>
> (
>>     `entries` ARRAY<ROW<
>>         *`type` ??? (This is the main question)*
>>     >>
>> ) WITH (
>>     'connector' = 'kafka',
>>     'topic' = 'my-topic',
>>     'properties.bootstrap.servers' = '...:9092',
>>     'scan.startup.mode' = 'latest-offset',
>>     'value.format' = 'avro-confluent',
>>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>
> )
>
>
> And I run the following query :
>
>> SELECT * FROM my_table
>
>
> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
> the type:
>
>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>>   at
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>>   at
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>>   at
>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>   at
>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>   at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>>   at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>   at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>   at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>   at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>> *Caused by: org.apache.avro.AvroTypeException: Found
>> my.type.avro.MyEnumType, expecting union*
>>   at
>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>   at
>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>>   at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>   at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>   at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>   at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>   at
>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>>   at
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>>   ... 9 more
>
> The reason I use the STRING type is just for fast-prototyping.
>
> While reading through [1], I've been thinking about using *RAW('class',
> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
> whether it is a good idea and if so, what can be a value for the snapshot.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
>
> Thanks in advance,
>
> Dongwon
>

Fwd: How to deserialize Avro enum type in Flink SQL?

Posted by Dongwon Kim <ea...@gmail.com>.
Hi community,

Can I get advice on this question?

Another user just sent me an email asking whether I found a solution or a
workaround for this question, but I'm still stuck there.

Any suggestions?

Thanks in advance,

Dongwon

---------- Forwarded message ---------
From: Dongwon Kim <ea...@gmail.com>
Date: Mon, Aug 9, 2021 at 7:26 PM
Subject: How to deserialize Avro enum type in Flink SQL?
To: user <us...@flink.apache.org>


Hi community,

I have a Kafka topic where the schema of its values is defined by the
"MyRecord" record in the following Avro IDL and registered to the Confluent
Schema Registry.

> @namespace("my.type.avro")
> protocol MyProtocol {
>   enum MyEnumType {
>     TypeVal1, TypeVal2
>   }
>   record MyEntry {
>     MyEnumType type;
>   }
>   record MyRecord {
>     array<MyEntry> entries;
>   }
> }


To read from the topic, I've defined the following DDL:

> CREATE TABLE my_table

(
>     `entries` ARRAY<ROW<
>         *`type` ??? (This is the main question)*
>     >>
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'my-topic',
>     'properties.bootstrap.servers' = '...:9092',
>     'scan.startup.mode' = 'latest-offset',
>     'value.format' = 'avro-confluent',
>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>
)


And I run the following query :

> SELECT * FROM my_table


Now I got the following messages in Flink-1.13.1 when I use *STRING* for
the type:

> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>   at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>   at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>   at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>   at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> *Caused by: org.apache.avro.AvroTypeException: Found
> my.type.avro.MyEnumType, expecting union*
>   at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>   at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>   at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>   ... 9 more

The reason I use the STRING type is just for fast-prototyping.

While reading through [1], I've been thinking about using *RAW('class',
'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
whether it is a good idea and if so, what can be a value for the snapshot.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw

Thanks in advance,

Dongwon