You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Edwin <xu...@163.com> on 2022/09/28 07:33:38 UTC

Question regarding to debezium format

Hi guys,


I was trying to use flink sql to consume data from kafka source, the format of which is debezium-avro-confluent. And I encountered a AvroTypeException saying that "Found something, expecting union", where something is not a type but a field that I defined in the schema registery. 
So I looked into the source code and what I found in org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.createDebeziumAvroRowType was a segement of comment saying that "Debezium Avro contains other information, e.g. "source", "ts_ms", but we don't need them". I am wondering why don't we need it? Both source and ts_ms are in my schema, and I assume that the absence of source and ts_ms caused the exception described above. 
I am using Flink 1.15. Any help would be highly appreciated! Thanks!

Re: Question regarding to debezium format

Posted by Ali Bahadir Zeybek <al...@ververica.com>.
Hello Edwin,

Would you mind sharing a simple FlinkSQL DDL for the table you are creating
with the kafka connector and dthe debezium-avro-confluent format?

Also, can you elaborate on the mechanism who publishes initially to the
schema registry and share the corresponding schema?

In a nutshell, the error messages usually indicate that where the field is
nullable, it is not defined as nullable in the avro schema.

Sincerely,

Ali

On Thu, Sep 29, 2022 at 12:27 PM Martijn Visser <ma...@apache.org>
wrote:

> Hi Edwin,
>
> I'm suspecting that's because those fields are considered metadata which
> are treated separately. There's
> https://issues.apache.org/jira/browse/FLINK-20454 for adding the metadata
> support for the Debezium format with a PR provided, but not yet reviewed.
> If you could have a look at the PR and if it would work, we can see if we
> can get that merged in.
>
> Best regards,
>
> Martijn
>
> On Wed, Sep 28, 2022 at 9:35 AM Edwin <xu...@163.com> wrote:
>
>> Hi guys,
>>
>> I was trying to use flink sql to consume data from kafka source, the
>> format of which is debezium-avro-confluent. And I encountered a
>> AvroTypeException saying that "Found something, expecting union", where
>> something is not a type but a field that I defined in the schema registery.
>> So I looked into the source code and what I found
>> in org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.createDebeziumAvroRowType
>> was a segement of comment saying that "Debezium Avro contains other
>> information, e.g. "source", "ts_ms", but we don't need them". I am
>> wondering why don't we need it? Both source and ts_ms are in my schema, and
>> I assume that the absence of source and ts_ms caused the exception
>> described above.
>> I am using Flink 1.15. Any help would be highly appreciated! Thanks!
>>
>

Re: Question regarding to debezium format

Posted by Martijn Visser <ma...@apache.org>.
Hi Edwin,

I'm suspecting that's because those fields are considered metadata which
are treated separately. There's
https://issues.apache.org/jira/browse/FLINK-20454 for adding the metadata
support for the Debezium format with a PR provided, but not yet reviewed.
If you could have a look at the PR and if it would work, we can see if we
can get that merged in.

Best regards,

Martijn

On Wed, Sep 28, 2022 at 9:35 AM Edwin <xu...@163.com> wrote:

> Hi guys,
>
> I was trying to use flink sql to consume data from kafka source, the
> format of which is debezium-avro-confluent. And I encountered a
> AvroTypeException saying that "Found something, expecting union", where
> something is not a type but a field that I defined in the schema registery.
> So I looked into the source code and what I found
> in org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.createDebeziumAvroRowType
> was a segement of comment saying that "Debezium Avro contains other
> information, e.g. "source", "ts_ms", but we don't need them". I am
> wondering why don't we need it? Both source and ts_ms are in my schema, and
> I assume that the absence of source and ts_ms caused the exception
> described above.
> I am using Flink 1.15. Any help would be highly appreciated! Thanks!
>