You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/12/16 14:23:00 UTC
[jira] [Comment Edited] (FLINK-30438) The generated schema is not correct when using value.format debezium-avro-confluent
[ https://issues.apache.org/jira/browse/FLINK-30438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648652#comment-17648652 ]
Martijn Visser edited comment on FLINK-30438 at 12/16/22 2:22 PM:
------------------------------------------------------------------
[~qinjunjerry] Apologies. I intended to only remove the "Can you check and fix", but removed too much accidentally. Just restored that.
was (Author: martijnvisser):
[~qinjunjerry] I intended to only remove the "Can you check and fix", but removed too much accidentally. Just restored that.
> The generated schema is not correct when using value.format debezium-avro-confluent
> -----------------------------------------------------------------------------------
>
> Key: FLINK-30438
> URL: https://issues.apache.org/jira/browse/FLINK-30438
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.16.0
> Reporter: Jun Qin
> Priority: Major
>
> With the following code:
> {code:java}
> CREATE TABLE TEST(
> ID BIGINT,
> INTEGRATION_ID STRING,
> PRIMARY KEY(INTEGRATION_ID) NOT ENFORCED
> ) WITH(
> 'connector' = 'kafka',
> 'topic' = 'TEST',
> 'properties.bootstrap.servers' = 'broker:29092',
> 'properties.group.id' = 'TEST',
> 'key.format' = 'avro-confluent',
> 'key.fields' = 'INTEGRATION_ID',
> 'key.avro-confluent.url' = 'http://schema-registry:8081',
> 'value.format' = 'debezium-avro-confluent',
> 'value.debezium-avro-confluent.url' = 'http://schema-registry:8081',
> 'scan.startup.mode' = 'earliest-offset'
> ); {code}
> and this INSERT statement:
> {code:java}
> INSERT INTO TEST SELECT 1, '1'; {code}
> The schema we get in the schema registry is:
> {code:java}
> [
> "null",
> {
> "fields": [
> {
> "default": null,
> "name": "before",
> "type": [
> "null",
> {
> "fields": [
> {
> "default": null,
> "name": "ID",
> "type": [
> "null",
> "long"
> ]
> },
> {
> "name": "INTEGRATION_ID",
> "type": "string"
> }
> ],
> "name": "record_before",
> "type": "record"
> }
> ]
> },
> {
> "default": null,
> "name": "after",
> "type": [
> "null",
> {
> "fields": [
> {
> "default": null,
> "name": "ID",
> "type": [
> "null",
> "long"
> ]
> },
> {
> "name": "INTEGRATION_ID",
> "type": "string"
> }
> ],
> "name": "record_after",
> "type": "record"
> }
> ]
> },
> {
> "default": null,
> "name": "op",
> "type": [
> "null",
> "string"
> ]
> }
> ],
> "name": "record",
> "namespace": "org.apache.flink.avro.generated",
> "type": "record"
> }
> ] {code}
> The first 'null' in the schema shouldn't be there, I think.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)