You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/06 08:21:34 UTC
[inlong] branch master updated: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (Supplement) (#7175)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 217afe1ad [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (Supplement) (#7175)
217afe1ad is described below
commit 217afe1ade03482fdf43d282fbe63721a5e61c25
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Fri Jan 6 16:21:28 2023 +0800
[INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (Supplement) (#7175)
Co-authored-by: stingpeng <st...@tencent.com>
---
.../inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
index 2a455de98..2e6a9f85d 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
@@ -188,7 +188,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
if (isDataChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
BinlogOffset position = getBinlogPosition(sourceRecord);
- if (hasEnterPureBinlogPhase(tableId, position)) {
+ // source record has no primary need no comparing for binlog position
+ if (hasEnterPureBinlogPhase(tableId, position) || sourceRecord.key() == null) {
return true;
}
// only the table who captured snapshot splits need to filter