You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhiwen Sun (Jira)" <ji...@apache.org> on 2022/05/18 03:43:00 UTC

[jira] [Comment Edited] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink

    [ https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17538556#comment-17538556 ] 

Zhiwen Sun edited comment on FLINK-27663 at 5/18/22 3:42 AM:
-------------------------------------------------------------

 

1. produce a DELETE message to kafka 
{code:java}
echo '{"id":1}#null' | kafka-console-producer --broker-list $broker --topic test_use --property "parse.key=true" --property "key.separator=#"{code}
 
2. run a flink sql job:
{code:java}
SET sql-client.execution.result-mode = tableau;
create table delete_test(
  id bigint,
  PRIMARY KEY (id) NOT ENFORCED
) WITH ( 'connector' = 'upsert-kafka',
 'topic' = 'test_use',
 'properties.bootstrap.servers' = 'your broker',
 'properties.group.id' = 'your group id',
 'value.json.fail-on-missing-field' = 'false',
 'value.json.ignore-parse-errors' = 'true',
 'key.json.fail-on-missing-field' = 'false',
 'key.json.ignore-parse-errors' = 'true',
 'key.format' = 'json',
 'value.format' = 'json');
select
*
from
delete_test
;
{code}

 

3.  expect : 
 
{code:java}
+----+----------------------+
| op |                   id |
+----+----------------------+
| -D |                    1 | {code}
 
 
actual: no output
 


was (Author: pensz):
 

1. produce a DELETE message to kafka 
{code:java}
echo '{"id":1}#null' | kafka-console-producer --broker-list $broker --topic test_use --property "parse.key=true" --property "key.separator=#"{code}
 
2. run a flink sql job:
SET sql-client.execution.result-mode = tableau;

create table delete_test(
  id bigint,
  PRIMARY KEY (id) NOT ENFORCED
) WITH ( 'connector' = 'upsert-kafka',
 'topic' = 'test_use',
 'properties.bootstrap.servers' = 'your broker',
 'properties.group.id' = 'your group id',
 'value.json.fail-on-missing-field' = 'false',
 'value.json.ignore-parse-errors' = 'true',
 'key.json.fail-on-missing-field' = 'false',
 'key.json.ignore-parse-errors' = 'true',
 'key.format' = 'json',
 'value.format' = 'json');
select
*
from
delete_test
;
 

3.  expect : 
 
{code:java}
+----+----------------------+
| op |                   id |
+----+----------------------+
| -D |                    1 | {code}
 
 
actual: no output
 

> upsert-kafka can't process delete message from upsert-kafka sink
> ----------------------------------------------------------------
>
>                 Key: FLINK-27663
>                 URL: https://issues.apache.org/jira/browse/FLINK-27663
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.0, 1.13.6, 1.14.4
>            Reporter: Zhiwen Sun
>            Priority: Major
>
> upsert-kafka write DELETE data as Kafka messages with null values (indicate tombstone for the key).
> But when use upsert-kafka as a source table to consumer kafka messages write by upsert-kafka sink, DELETE messages will be ignored.
>  
> related sql :
>  
>  
> {code:java}
> create table order_system_log(
>   id bigint,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'test_use',
>  'properties.bootstrap.servers' = 'your broker',
>  'properties.group.id' = 'your group id',
>  'value.json.fail-on-missing-field' = 'false',
>  'value.json.ignore-parse-errors' = 'true',
>  'key.json.fail-on-missing-field' = 'false',
>  'key.json.ignore-parse-errors' = 'true',
>  'key.format' = 'json',
>  'value.format' = 'json'
> );
> select
> *
> from
> order_system_log
> ;
> {code}
>  
>  
> The problem may be produced by DeserializationSchema#deserialize,
> this method does not collect data while subclass's deserialize return null.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)