You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Alvin Ge (Jira)" <ji...@apache.org> on 2023/04/12 11:50:00 UTC

[jira] [Updated] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.

     [ https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alvin Ge updated FLINK-31777:
-----------------------------
    Description: 
I use debezium send data to kafka with confluent avro format,  when I use 'upsert-kafka' connector, all values are null, but in 'kafka' connector all values is well.

My upsert-kafka table like this:

 
{code:java}
// code placeholder

create table TEA02
(
    SUB_SYSTEM_ENAME varchar(255),
    REC_CREATOR      varchar(255),
    REC_CREATE_TIME  varchar(255),
    REC_REVISOR      varchar(255),
    REC_REVISE_TIME  varchar(255),
    ARCHIVE_FLAG     varchar(255),
    SUB_SYSTEM_CNAME varchar(255),
    SUB_SYSTEM_FNAME varchar(255),
    SUB_SYSTEM_LEVEL varchar(255),
    primary key (SUB_SYSTEM_ENAME) not enforced
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'dev.oracle.JNMMM1.TEA02',
 'properties.bootstrap.servers' = '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
 'properties.group.id' = 'TEA02',
 'key.format' = 'avro-confluent',
 'key.avro-confluent.url' = 'http://10.0.170.213:8081',
 'value.format' = 'avro-confluent',
 'value.avro-confluent.url' = 'http://10.0.170.213:8081',
 'value.fields-include' = 'EXCEPT_KEY'
); {code}
 

query result:

 
||SUB_SYSTEM_ENAME||REC_CREATOR||REC_CREATE_TIME||.......||
|CJ|null|null|null|

 

 

  was:
I use debezium send data to kafka with confluent avro format,  when I use upsert-kafka connector, all values are null, in 'kafka' connector all values is well.

My upsert-kafka table like this:

 
{code:java}
// code placeholder

create table TEA02
(
    SUB_SYSTEM_ENAME varchar(255),
    REC_CREATOR      varchar(255),
    REC_CREATE_TIME  varchar(255),
    REC_REVISOR      varchar(255),
    REC_REVISE_TIME  varchar(255),
    ARCHIVE_FLAG     varchar(255),
    SUB_SYSTEM_CNAME varchar(255),
    SUB_SYSTEM_FNAME varchar(255),
    SUB_SYSTEM_LEVEL varchar(255),
    primary key (SUB_SYSTEM_ENAME) not enforced
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'dev.oracle.JNMMM1.TEA02',
 'properties.bootstrap.servers' = '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
 'properties.group.id' = 'TEA02',
 'key.format' = 'avro-confluent',
 'key.avro-confluent.url' = 'http://10.0.170.213:8081',
 'value.format' = 'avro-confluent',
 'value.avro-confluent.url' = 'http://10.0.170.213:8081',
 'value.fields-include' = 'EXCEPT_KEY'
); {code}
 

query result:

 
||SUB_SYSTEM_ENAME||REC_CREATOR||REC_CREATE_TIME||.......||
|CJ|null|null|null|

 

 


> Upsert Kafka use Avro Confluent, key is ok, but all values are null.
> --------------------------------------------------------------------
>
>                 Key: FLINK-31777
>                 URL: https://issues.apache.org/jira/browse/FLINK-31777
>             Project: Flink
>          Issue Type: Improvement
>          Components: kafka
>    Affects Versions: 1.16.0
>         Environment: Flink: 1.6.0
> Confluent version: 7.3.3
> Debezium version: 2.1.0/2.0.0
>  
>            Reporter: Alvin Ge
>            Priority: Major
>
> I use debezium send data to kafka with confluent avro format,  when I use 'upsert-kafka' connector, all values are null, but in 'kafka' connector all values is well.
> My upsert-kafka table like this:
>  
> {code:java}
> // code placeholder
> create table TEA02
> (
>     SUB_SYSTEM_ENAME varchar(255),
>     REC_CREATOR      varchar(255),
>     REC_CREATE_TIME  varchar(255),
>     REC_REVISOR      varchar(255),
>     REC_REVISE_TIME  varchar(255),
>     ARCHIVE_FLAG     varchar(255),
>     SUB_SYSTEM_CNAME varchar(255),
>     SUB_SYSTEM_FNAME varchar(255),
>     SUB_SYSTEM_LEVEL varchar(255),
>     primary key (SUB_SYSTEM_ENAME) not enforced
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'dev.oracle.JNMMM1.TEA02',
>  'properties.bootstrap.servers' = '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
>  'properties.group.id' = 'TEA02',
>  'key.format' = 'avro-confluent',
>  'key.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.format' = 'avro-confluent',
>  'value.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.fields-include' = 'EXCEPT_KEY'
> ); {code}
>  
> query result:
>  
> ||SUB_SYSTEM_ENAME||REC_CREATOR||REC_CREATE_TIME||.......||
> |CJ|null|null|null|
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)