You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "tanyao (Jira)" <ji...@apache.org> on 2022/06/23 03:02:00 UTC

[jira] [Created] (FLINK-28209) KafkaSink with EXACTLY_ONCE produce reduplicate data(flink kafka connector1.14.4)

tanyao created FLINK-28209:
------------------------------

             Summary: KafkaSink with EXACTLY_ONCE  produce reduplicate data(flink kafka connector1.14.4)
                 Key: FLINK-28209
                 URL: https://issues.apache.org/jira/browse/FLINK-28209
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.14.4
            Reporter: tanyao
         Attachments: image-2022-06-23-10-49-01-213.png, image-2022-06-23-10-58-15-141.png

I'm trying to read mysql binlog and transport it to kafka;

here is what i'm using :

*Flink: 1.14.4*

*Flink-CDC : 2.2*

*Kafka: CDH6.2(2.1)*

 

*Stage-1:* 

mysql-cdc-connector was used to consume mysql binlog data . about 40W rows changed when i executed some sql in mysql, and i can get those 40W rows without any data lose or reduplicate, just the some number as mysql changed . So, i don't think cdc is the problem.

 

Stage-2:

when i got binlog data, first i deserialized it to type of Tuple2<String,String>, which tuple2.f0 has the format  "db.table" and i intend to use it as kafka topic for every different db.table, tuple2.f1 contains binlog value only.

 

*Stage-3:*

then, i used KafkaSink (which was introduced in flink 1.14) to write binlog to different kafka topic as tuple2.f0 indicated. 

Here is the code like :

!image-2022-06-23-10-49-01-213.png!

 

As u can see, I just want to use EXACTLY_ONCE semantics,but here is the problem:

after about 10mins waiting for all binlog consumed, i checked all data in a single kafka topic   (just one topic ), the total number of rows is much more than the number of binlog rows from mysql data changed, because too many reduplicated data sink to kafka. For example

!image-2022-06-23-10-58-15-141.png!

 

Stage-4:

however, when i changed  EXACTLY_ONCE. to.  AT_LEAST_ONCE, everything worked very well, no more reduplicated data in kafka.

 

 

So i'm wonderring , is there any bug in KafkaSink when EXACTLY_ONCE is configured.

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)