You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Alvin Ge (Jira)" <ji...@apache.org> on 2023/04/12 13:00:04 UTC

[jira] [Comment Edited] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.

    [ https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711374#comment-17711374 ] 

Alvin Ge edited comment on FLINK-31777 at 4/12/23 1:00 PM:
-----------------------------------------------------------

[~martijnvisser] Hello, do you know what json format can use upsert-kafka? I tried debezium style json and `value.format` specifie `debezium-json`, I got an error debezium-json is not insert only, than I specifie `json` values are still null, I don't know how to do this...

Sorry this is my first time to use upsert-kafka.

 

Thanks.


was (Author: ge.bugman):
[~martijnvisser] Hello, do you know what json format can use upsert-kafka? I tried debezium style json and `value.format` specifie `debezium-json`, I got an error debezium-json is not insert only, than I specifie `json` values are still null, I don't know how to do this...

Sorry this is my first time to use upsert-kafka.

> Upsert Kafka use Avro Confluent, key is ok, but all values are null.
> --------------------------------------------------------------------
>
>                 Key: FLINK-31777
>                 URL: https://issues.apache.org/jira/browse/FLINK-31777
>             Project: Flink
>          Issue Type: Improvement
>          Components: kafka
>    Affects Versions: 1.16.0
>         Environment: Flink: 1.16.0
> Confluent version: 7.3.3
> Debezium version: 2.1.0/2.0.0
>  
>            Reporter: Alvin Ge
>            Priority: Major
>
> I use debezium send data to kafka with confluent avro format,  when I use 'upsert-kafka' connector, all values are null (primary key has value), but in 'kafka' connector all values are well.
> My upsert-kafka table like this:
> {code:java}
> // code placeholder
> create table TEA02
> (
>     SUB_SYSTEM_ENAME varchar(255),
>     REC_CREATOR      varchar(255),
>     REC_CREATE_TIME  varchar(255),
>     REC_REVISOR      varchar(255),
>     REC_REVISE_TIME  varchar(255),
>     ARCHIVE_FLAG     varchar(255),
>     SUB_SYSTEM_CNAME varchar(255),
>     SUB_SYSTEM_FNAME varchar(255),
>     SUB_SYSTEM_LEVEL varchar(255),
>     primary key (SUB_SYSTEM_ENAME) not enforced
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'dev.oracle.JNMMM1.TEA02',
>  'properties.bootstrap.servers' = '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
>  'properties.group.id' = 'TEA02',
>  'key.format' = 'avro-confluent',
>  'key.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.format' = 'avro-confluent',
>  'value.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.fields-include' = 'EXCEPT_KEY'
> ); {code}
> query result:
> ||SUB_SYSTEM_ENAME(this columns is pk)||REC_CREATOR||REC_CREATE_TIME||.......||
> |CJ|null|null|null|
> Specified subject still not working.
> {code:java}
> // code placeholder
>  'key.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-key',
>  'value.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-value' {code}
> BTW: All debezium events are READ operation.
> The confluent schemas are here:
> {code:java}
> // code placeholder
> [{
>     "subject": "dev.oracle-key",
>     "version": 1,
>     "id": 1,
>     "schema": "{\"type\":\"record\",\"name\":\"SchemaChangeKey\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"databaseName\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.oracle.SchemaChangeKey\"}"
> }, {
>     "subject": "dev.oracle-value",
>     "version": 1,
>     "id": 2,
>     "schema": "{\"type\":\"record\",\"name\":\"SchemaChangeValue\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"txId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"lcr_position\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rs_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ssn\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"redo_thread\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.oracle.Source\"}},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"databaseName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schemaName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ddl\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tableChanges\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Change\",\"namespace\":\"io.debezium.connector.schema\",\"fields\":[{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"table\",\"type\":{\"type\":\"record\",\"name\":\"Table\",\"fields\":[{\"name\":\"defaultCharsetName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"primaryKeyColumnNames\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"columns\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Column\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"jdbcType\",\"type\":\"int\"},{\"name\":\"nativeType\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"typeName\",\"type\":\"string\"},{\"name\":\"typeExpression\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"charsetName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"length\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"scale\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"position\",\"type\":\"int\"},{\"name\":\"optional\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"autoIncremented\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"generated\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"defaultValueExpression\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"enumValues\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.schema.Column\"}}},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.schema.Table\"}}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.schema.Change\"}}}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.oracle.SchemaChangeValue\"}"
> }, {
>     "subject": "dev.oracle.JNMMM1.TEA02-key",
>     "version": 1,
>     "id": 3,
>     "schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"dev.oracle.JNMMM1.TEA02\",\"fields\":[{\"name\":\"SUB_SYSTEM_ENAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"}],\"connect.name\":\"dev.oracle.JNMMM1.TEA02.Key\"}"
> }, {
>     "subject": "dev.oracle.JNMMM1.TEA02-value",
>     "version": 1,
>     "id": 4,
>     "schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"dev.oracle.JNMMM1.TEA02\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"REC_CREATOR\",\"type\":[{\"type\":\"string\",\"connect.default\":\" \"},\"null\"],\"default\":\" \"},{\"name\":\"REC_CREATE_TIME\",\"type\":[{\"type\":\"string\",\"connect.default\":\" \"},\"null\"],\"default\":\" \"},{\"name\":\"REC_REVISOR\",\"type\":[{\"type\":\"string\",\"connect.default\":\" \"},\"null\"],\"default\":\" \"},{\"name\":\"REC_REVISE_TIME\",\"type\":[{\"type\":\"string\",\"connect.default\":\" \"},\"null\"],\"default\":\" \"},{\"name\":\"ARCHIVE_FLAG\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"},{\"name\":\"SUB_SYSTEM_ENAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"},{\"name\":\"SUB_SYSTEM_CNAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"},{\"name\":\"SUB_SYSTEM_FNAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"},{\"name\":\"SUB_SYSTEM_LEVEL\",\"type\":{\"type\":\"string\",\"connect.default\":\" \"},\"default\":\" \"}],\"connect.name\":\"dev.oracle.JNMMM1.TEA02.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"txId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"lcr_position\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rs_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ssn\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"redo_thread\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.oracle.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"block\",\"namespace\":\"event\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}],\"connect.version\":1,\"connect.name\":\"event.block\"}],\"default\":null}],\"connect.version\":1,\"connect.name\":\"dev.oracle.JNMMM1.TEA02.Envelope\"}"
> }] {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)