You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2023/04/17 02:21:00 UTC
[jira] [Closed] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.
[ https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu closed FLINK-31777.
---------------------------
Resolution: Not A Problem
> Upsert Kafka use Avro Confluent, key is ok, but all values are null.
> --------------------------------------------------------------------
>
> Key: FLINK-31777
> URL: https://issues.apache.org/jira/browse/FLINK-31777
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.16.0
> Environment: Flink: 1.16.0
> Confluent version: 7.3.3
> Debezium version: 2.1.0/2.0.0
>
> Reporter: Alvin Ge
> Priority: Major
>
> I use debezium send data to kafka with confluent avro format, when I use 'upsert-kafka' connector, all values are null (primary key has value), but in 'kafka' connector all values are well.
> My upsert-kafka table like this:
> {code:java}
> // code placeholder
> create table TEA02
> (
> SUB_SYSTEM_ENAME varchar(255),
> REC_CREATOR varchar(255),
> REC_CREATE_TIME varchar(255),
> REC_REVISOR varchar(255),
> REC_REVISE_TIME varchar(255),
> ARCHIVE_FLAG varchar(255),
> SUB_SYSTEM_CNAME varchar(255),
> SUB_SYSTEM_FNAME varchar(255),
> SUB_SYSTEM_LEVEL varchar(255),
> primary key (SUB_SYSTEM_ENAME) not enforced
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'dev.oracle.JNMMM1.TEA02',
> 'properties.bootstrap.servers' = '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
> 'properties.group.id' = 'TEA02',
> 'key.format' = 'avro-confluent',
> 'key.avro-confluent.url' = 'http://10.0.170.213:8081',
> 'value.format' = 'avro-confluent',
> 'value.avro-confluent.url' = 'http://10.0.170.213:8081',
> 'value.fields-include' = 'EXCEPT_KEY'
> ); {code}
> query result:
> ||SUB_SYSTEM_ENAME(this columns is pk)||REC_CREATOR||REC_CREATE_TIME||.......||
> |CJ|null|null|null|
> Specified subject still not working.
> {code:java}
> // code placeholder
> 'key.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-key',
> 'value.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-value' {code}
> BTW: All debezium events are READ operation.
> The confluent schemas are here:
> {code:java}
> // code placeholder
> [{
> "subject": "dev.oracle-key",
> "version": 1,
> "id": 1,
> "schema": "{\"type\":\"record\",\"name\":\"SchemaChangeKey\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"databaseName\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.oracle.SchemaChangeKey\"}"
> }, {
> "subject": "dev.oracle-value",
> "version": 1,
> "id": 2,
> "schema": "{\"type\":\"record\",\"name\":\"SchemaChangeValue\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"txId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"lcr_position\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rs_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ssn\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"redo_thread\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.oracle.Source\"}},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"databaseName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schemaName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ddl\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tableChanges\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Change\",\"namespace\":\"io.debezium.connector.schema\",\"fields\":[{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"table\",\"type\":{\"type\":\"record\",\"name\":\"Table\",\"fields\":[{\"name\":\"defaultCharsetName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"primaryKeyColumnNames\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"columns\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Column\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"jdbcType\",\"type\":\"int\"},{\"name\":\"nativeType\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"typeName\",\"type\":\"string\"},{\"name\":\"typeExpression\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"charsetName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"length\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"scale\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"position\",\"type\":\"int\"},{\"name\":\"optional\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"autoIncremented\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"generated\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"defaultValueExpression\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"enumValues\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.schema.Column\"}}},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.schema.Table\"}}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.schema.Change\"}}}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.oracle.SchemaChangeValue\"}"
> }, {
> "subject": "dev.oracle.JNMMM1.TEA02-key",
> "version": 1,
> "id": 3,
> "schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"dev.oracle.JNMMM1.TEA02\",\"fields\":[{\"name\":\"SUB_SYSTEM_ENAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"}],\"connect.name\":\"dev.oracle.JNMMM1.TEA02.Key\"}"
> }, {
> "subject": "dev.oracle.JNMMM1.TEA02-value",
> "version": 1,
> "id": 4,
> "schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"dev.oracle.JNMMM1.TEA02\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"REC_CREATOR\",\"type\":[{\"type\":\"string\",\"connect.default\":\" \"},\"null\"],\"default\":\" \"},{\"name\":\"REC_CREATE_TIME\",\"type\":[{\"type\":\"string\",\"connect.default\":\" \"},\"null\"],\"default\":\" \"},{\"name\":\"REC_REVISOR\",\"type\":[{\"type\":\"string\",\"connect.default\":\" \"},\"null\"],\"default\":\" \"},{\"name\":\"REC_REVISE_TIME\",\"type\":[{\"type\":\"string\",\"connect.default\":\" \"},\"null\"],\"default\":\" \"},{\"name\":\"ARCHIVE_FLAG\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"},{\"name\":\"SUB_SYSTEM_ENAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"},{\"name\":\"SUB_SYSTEM_CNAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"},{\"name\":\"SUB_SYSTEM_FNAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"},{\"name\":\"SUB_SYSTEM_LEVEL\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"}],\"connect.name\":\"dev.oracle.JNMMM1.TEA02.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"txId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"lcr_position\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rs_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ssn\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"redo_thread\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.oracle.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"block\",\"namespace\":\"event\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}],\"connect.version\":1,\"connect.name\":\"event.block\"}],\"default\":null}],\"connect.version\":1,\"connect.name\":\"dev.oracle.JNMMM1.TEA02.Envelope\"}"
> }] {code}
>
>
> ---------------
>
> I tried kafka connector, the values are still null, may be confluent version 7.3.3 is not siutable flink?
> my table look like:
>
>
> {code:java}
> // code placeholder
> create table TEA02
> (
> SUB_SYSTEM_ENAME string,
> REC_CREATOR string,
> REC_CREATE_TIME string,
> REC_REVISOR string,
> REC_REVISE_TIME string,
> ARCHIVE_FLAG string,
> SUB_SYSTEM_CNAME string,
> SUB_SYSTEM_FNAME string,
> SUB_SYSTEM_LEVEL string
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'dev.oracle.JNMMM1.TEA02',
> 'properties.bootstrap.servers' = '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
> 'properties.group.id' = 'TEA02',
> 'key.format' = 'avro-confluent',
> 'key.fields' = 'SUB_SYSTEM_ENAME',
> 'key.avro-confluent.url' = 'http://10.0.170.213:8081',
> 'value.format' = 'avro-confluent',
> 'value.avro-confluent.url' = 'http://10.0.170.213:8081',
> 'value.fields-include' = 'EXCEPT_KEY',
> 'key.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-key',
> 'value.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-value',
> 'scan.startup.mode' = 'earliest-offset'
> ); {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)