You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/05/17 09:09:00 UTC
[jira] [Commented] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink
[ https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17538047#comment-17538047 ]
Martijn Visser commented on FLINK-27663:
----------------------------------------
[~pensz] What is the error that you're getting? Based on https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/ I would expect that in changelog mode you would get a record with a null value.
> upsert-kafka can't process delete message from upsert-kafka sink
> ----------------------------------------------------------------
>
> Key: FLINK-27663
> URL: https://issues.apache.org/jira/browse/FLINK-27663
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.15.0, 1.13.6, 1.14.4
> Reporter: Zhiwen Sun
> Priority: Major
>
> upsert-kafka write DELETE data as Kafka messages with null values (indicate tombstone for the key).
> But when use upsert-kafka as a source table to consumer kafka messages write by upsert-kafka sink, DELETE messages will be ignored.
>
> related sql :
>
>
> {code:java}
> create table order_system_log(
> id bigint,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'test_use',
> 'properties.bootstrap.servers' = 'your broker',
> 'properties.group.id' = 'your group id',
> 'value.json.fail-on-missing-field' = 'false',
> 'value.json.ignore-parse-errors' = 'true',
> 'key.json.fail-on-missing-field' = 'false',
> 'key.json.ignore-parse-errors' = 'true',
> 'key.format' = 'json',
> 'value.format' = 'json'
> );
> select
> *
> from
> order_system_log
> ;
> {code}
>
>
> The problem may be produced by DeserializationSchema#deserialize,
> this method does not collect data while subclass's deserialize return null.
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)