You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jun Qin (Jira)" <ji...@apache.org> on 2022/12/16 14:03:00 UTC

[jira] [Commented] (FLINK-30438) The generated schema is not correct when using value.format debezium-avro-confluent

    [ https://issues.apache.org/jira/browse/FLINK-30438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648648#comment-17648648 ] 

Jun Qin commented on FLINK-30438:
---------------------------------

[~martijnvisser] Any reason you removed the original problem description part?

> The generated schema is not correct when using value.format debezium-avro-confluent
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-30438
>                 URL: https://issues.apache.org/jira/browse/FLINK-30438
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.16.0
>            Reporter: Jun Qin
>            Priority: Major
>
> With the following code:
> {code:java}
> CREATE TABLE TEST(
>     ID BIGINT,
>     INTEGRATION_ID STRING,
>     PRIMARY KEY(INTEGRATION_ID) NOT ENFORCED
> ) WITH(
>     'connector' = 'kafka',
>     'topic' = 'TEST',
>     'properties.bootstrap.servers' = 'broker:29092',
>     'properties.group.id' = 'TEST',
>     'key.format' = 'avro-confluent',
>     'key.fields' = 'INTEGRATION_ID',
>     'key.avro-confluent.url' = 'http://schema-registry:8081',
>     'value.format' = 'debezium-avro-confluent',
>     'value.debezium-avro-confluent.url' = 'http://schema-registry:8081',
>     'scan.startup.mode' = 'earliest-offset'
> ); {code}
> and this INSERT statement:
> {code:java}
> INSERT INTO TEST SELECT 1, '1'; {code}
> The schema we get in the schema registry is:
> {code:java}
> [
>   "null",
>   {
>     "fields": [
>       {
>         "default": null,
>         "name": "before",
>         "type": [
>           "null",
>           {
>             "fields": [
>               {
>                 "default": null,
>                 "name": "ID",
>                 "type": [
>                   "null",
>                   "long"
>                 ]
>               },
>               {
>                 "name": "INTEGRATION_ID",
>                 "type": "string"
>               }
>             ],
>             "name": "record_before",
>             "type": "record"
>           }
>         ]
>       },
>       {
>         "default": null,
>         "name": "after",
>         "type": [
>           "null",
>           {
>             "fields": [
>               {
>                 "default": null,
>                 "name": "ID",
>                 "type": [
>                   "null",
>                   "long"
>                 ]
>               },
>               {
>                 "name": "INTEGRATION_ID",
>                 "type": "string"
>               }
>             ],
>             "name": "record_after",
>             "type": "record"
>           }
>         ]
>       },
>       {
>         "default": null,
>         "name": "op",
>         "type": [
>           "null",
>           "string"
>         ]
>       }
>     ],
>     "name": "record",
>     "namespace": "org.apache.flink.avro.generated",
>     "type": "record"
>   }
> ] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)