You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jun Qin (Jira)" <ji...@apache.org> on 2022/12/16 13:54:00 UTC
[jira] [Updated] (FLINK-30438) The generated schema is not correct when using value.format debezium-avro-confluent
[ https://issues.apache.org/jira/browse/FLINK-30438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jun Qin updated FLINK-30438:
----------------------------
Description:
With the following code:
{code:java}
CREATE TABLE TEST(
ID BIGINT,
INTEGRATION_ID STRING,
PRIMARY KEY(INTEGRATION_ID) NOT ENFORCED
) WITH(
'connector' = 'kafka',
'topic' = 'TEST',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'TEST',
'key.format' = 'avro-confluent',
'key.fields' = 'INTEGRATION_ID',
'key.avro-confluent.url' = 'http://schema-registry:8081',
'value.format' = 'debezium-avro-confluent',
'value.debezium-avro-confluent.url' = 'http://schema-registry:8081',
'scan.startup.mode' = 'earliest-offset'
); {code}
and this INSERT statement:
{code:java}
INSERT INTO TEST SELECT 1, '1'; {code}
The schema we get in the schema registry is:
{code:java}
[
"null",
{
"fields": [
{
"default": null,
"name": "before",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "ID",
"type": [
"null",
"long"
]
},
{
"name": "INTEGRATION_ID",
"type": "string"
}
],
"name": "record_before",
"type": "record"
}
]
},
{
"default": null,
"name": "after",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "ID",
"type": [
"null",
"long"
]
},
{
"name": "INTEGRATION_ID",
"type": "string"
}
],
"name": "record_after",
"type": "record"
}
]
},
{
"default": null,
"name": "op",
"type": [
"null",
"string"
]
}
],
"name": "record",
"namespace": "org.apache.flink.avro.generated",
"type": "record"
}
] {code}
The first 'null' in the schema shouldn't be there, I think. Can you check and fix?
was:
With the following code:
{code:java}
CREATE TABLE TEST(
ID BIGINT,
INTEGRATION_ID STRING,
PRIMARY KEY(INTEGRATION_ID) NOT ENFORCED
) WITH(
'connector' = 'kafka',
'topic' = 'TEST',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'TEST',
'key.format' = 'avro-confluent',
'key.fields' = 'INTEGRATION_ID',
'key.avro-confluent.url' = 'http://schema-registry:8081',
'value.format' = 'debezium-avro-confluent',
'value.debezium-avro-confluent.url' = 'http://schema-registry:8081',
'scan.startup.mode' = 'earliest-offset'
); {code}
and this INSERT statement:
{code:java}
INSERT INTO TEST SELECT 1, '1'; {code}
The schema we get in schema registry is:
{code:java}
[
"null",
{
"fields": [
{
"default": null,
"name": "before",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "ID",
"type": [
"null",
"long"
]
},
{
"name": "INTEGRATION_ID",
"type": "string"
}
],
"name": "record_before",
"type": "record"
}
]
},
{
"default": null,
"name": "after",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "ID",
"type": [
"null",
"long"
]
},
{
"name": "INTEGRATION_ID",
"type": "string"
}
],
"name": "record_after",
"type": "record"
}
]
},
{
"default": null,
"name": "op",
"type": [
"null",
"string"
]
}
],
"name": "record",
"namespace": "org.apache.flink.avro.generated",
"type": "record"
}
] {code}
The first 'null' in the schema does not look to be correct. Can you check and fix?
> The generated schema is not correct when using value.format debezium-avro-confluent
> -----------------------------------------------------------------------------------
>
> Key: FLINK-30438
> URL: https://issues.apache.org/jira/browse/FLINK-30438
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.16.0
> Reporter: Jun Qin
> Priority: Major
>
> With the following code:
> {code:java}
> CREATE TABLE TEST(
> ID BIGINT,
> INTEGRATION_ID STRING,
> PRIMARY KEY(INTEGRATION_ID) NOT ENFORCED
> ) WITH(
> 'connector' = 'kafka',
> 'topic' = 'TEST',
> 'properties.bootstrap.servers' = 'broker:29092',
> 'properties.group.id' = 'TEST',
> 'key.format' = 'avro-confluent',
> 'key.fields' = 'INTEGRATION_ID',
> 'key.avro-confluent.url' = 'http://schema-registry:8081',
> 'value.format' = 'debezium-avro-confluent',
> 'value.debezium-avro-confluent.url' = 'http://schema-registry:8081',
> 'scan.startup.mode' = 'earliest-offset'
> ); {code}
> and this INSERT statement:
> {code:java}
> INSERT INTO TEST SELECT 1, '1'; {code}
> The schema we get in the schema registry is:
> {code:java}
> [
> "null",
> {
> "fields": [
> {
> "default": null,
> "name": "before",
> "type": [
> "null",
> {
> "fields": [
> {
> "default": null,
> "name": "ID",
> "type": [
> "null",
> "long"
> ]
> },
> {
> "name": "INTEGRATION_ID",
> "type": "string"
> }
> ],
> "name": "record_before",
> "type": "record"
> }
> ]
> },
> {
> "default": null,
> "name": "after",
> "type": [
> "null",
> {
> "fields": [
> {
> "default": null,
> "name": "ID",
> "type": [
> "null",
> "long"
> ]
> },
> {
> "name": "INTEGRATION_ID",
> "type": "string"
> }
> ],
> "name": "record_after",
> "type": "record"
> }
> ]
> },
> {
> "default": null,
> "name": "op",
> "type": [
> "null",
> "string"
> ]
> }
> ],
> "name": "record",
> "namespace": "org.apache.flink.avro.generated",
> "type": "record"
> }
> ] {code}
> The first 'null' in the schema shouldn't be there, I think. Can you check and fix?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)