You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jun Qin (Jira)" <ji...@apache.org> on 2022/12/16 13:53:00 UTC

[jira] [Created] (FLINK-30438) The generated schema is not correct when using value.format debezium-avro-confluent

Jun Qin created FLINK-30438:
-------------------------------

             Summary: The generated schema is not correct when using value.format debezium-avro-confluent
                 Key: FLINK-30438
                 URL: https://issues.apache.org/jira/browse/FLINK-30438
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.16.0
            Reporter: Jun Qin


With the following code:
{code:java}
CREATE TABLE TEST(
    ID BIGINT,
    INTEGRATION_ID STRING,
    PRIMARY KEY(INTEGRATION_ID) NOT ENFORCED
) WITH(
    'connector' = 'kafka',
    'topic' = 'TEST',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'TEST',
    'key.format' = 'avro-confluent',
    'key.fields' = 'INTEGRATION_ID',
    'key.avro-confluent.url' = 'http://schema-registry:8081',
    'value.format' = 'debezium-avro-confluent',
    'value.debezium-avro-confluent.url' = 'http://schema-registry:8081',
    'scan.startup.mode' = 'earliest-offset'
); {code}
and this INSERT statement:
{code:java}
INSERT INTO TEST SELECT 1, '1'; {code}
The schema we get in schema registry is:
{code:java}
[
  "null",
  {
    "fields": [
      {
        "default": null,
        "name": "before",
        "type": [
          "null",
          {
            "fields": [
              {
                "default": null,
                "name": "ID",
                "type": [
                  "null",
                  "long"
                ]
              },
              {
                "name": "INTEGRATION_ID",
                "type": "string"
              }
            ],
            "name": "record_before",
            "type": "record"
          }
        ]
      },
      {
        "default": null,
        "name": "after",
        "type": [
          "null",
          {
            "fields": [
              {
                "default": null,
                "name": "ID",
                "type": [
                  "null",
                  "long"
                ]
              },
              {
                "name": "INTEGRATION_ID",
                "type": "string"
              }
            ],
            "name": "record_after",
            "type": "record"
          }
        ]
      },
      {
        "default": null,
        "name": "op",
        "type": [
          "null",
          "string"
        ]
      }
    ],
    "name": "record",
    "namespace": "org.apache.flink.avro.generated",
    "type": "record"
  }
] {code}
The first 'null' in the schema does not look to be correct. Can you check and fix?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)