You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhiwen Sun (Jira)" <ji...@apache.org> on 2022/05/17 09:04:00 UTC

[jira] [Created] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink

Zhiwen Sun created FLINK-27663:
----------------------------------

             Summary: upsert-kafka can't process delete message from upsert-kafka sink
                 Key: FLINK-27663
                 URL: https://issues.apache.org/jira/browse/FLINK-27663
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.14.4, 1.13.6, 1.15.0
            Reporter: Zhiwen Sun


upsert-kafka write DELETE data as Kafka messages with null values (indicate tombstone for the key).

But when use upsert-kafka as a source table to consumer kafka messages write by upsert-kafka sink, DELETE messages will be ignored.

 

related sql :

 

 
{code:java}
create table order_system_log(
  id bigint,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'test_use',
 'properties.bootstrap.servers' = 'your broker',
 'properties.group.id' = 'your group id',
 'value.json.fail-on-missing-field' = 'false',
 'value.json.ignore-parse-errors' = 'true',
 'key.json.fail-on-missing-field' = 'false',
 'key.json.ignore-parse-errors' = 'true',
 'key.format' = 'json',
 'value.format' = 'json'
);
select
*
from
order_system_log
;
{code}
 

 

The problem may be produced by DeserializationSchema#deserialize,

this method does not collect data while subclass's deserialize return null.

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)