You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 11:43:57 UTC

[inlong] 05/05: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit b96d4ed4aa21f917ae2c49833e3a2925c295c3f5
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Thu Jan 5 19:19:02 2023 +0800

    [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)
    
    Co-authored-by: stingpeng <st...@tencent.com>
---
 .../sort/cdc/mysql/source/utils/RecordUtils.java   | 31 ++++++++++++++++------
 1 file changed, 23 insertions(+), 8 deletions(-)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index ef7ef4ca9..6944bd795 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -88,7 +88,8 @@ public class RecordUtils {
             List<SourceRecord> sourceRecords,
             SchemaNameAdjuster nameAdjuster) {
         List<SourceRecord> normalizedRecords = new ArrayList<>();
-        Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
+        Map<Struct, SourceRecord> snapshotRecordsWithKey = new HashMap<>();
+        List<SourceRecord> snapshotRecordsWithoutKey = new ArrayList<>();
         List<SourceRecord> binlogRecords = new ArrayList<>();
         if (!sourceRecords.isEmpty()) {
 
@@ -103,7 +104,11 @@ public class RecordUtils {
             for (; i < sourceRecords.size(); i++) {
                 SourceRecord sourceRecord = sourceRecords.get(i);
                 if (!isHighWatermarkEvent(sourceRecord)) {
-                    snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord);
+                    if (sourceRecord.key() == null) {
+                        snapshotRecordsWithoutKey.add(sourceRecord);
+                    } else {
+                        snapshotRecordsWithKey.put((Struct) sourceRecord.key(), sourceRecord);
+                    }
                 } else {
                     highWatermark = sourceRecord;
                     i++;
@@ -130,8 +135,11 @@ public class RecordUtils {
                     String.format(
                             "The last record should be high watermark signal event, but is %s",
                             highWatermark));
+
             normalizedRecords =
-                    upsertBinlog(lowWatermark, highWatermark, snapshotRecords, binlogRecords);
+                    upsertBinlog(lowWatermark, highWatermark, snapshotRecordsWithKey,
+                            binlogRecords, snapshotRecordsWithoutKey);
+
         }
         return normalizedRecords;
     }
@@ -139,8 +147,9 @@ public class RecordUtils {
     private static List<SourceRecord> upsertBinlog(
             SourceRecord lowWatermarkEvent,
             SourceRecord highWatermarkEvent,
-            Map<Struct, SourceRecord> snapshotRecords,
-            List<SourceRecord> binlogRecords) {
+            Map<Struct, SourceRecord> snapshotRecordsWithKey,
+            List<SourceRecord> binlogRecords,
+            List<SourceRecord> snapshotRecordsWithoutKey) {
         // upsert binlog events to snapshot events of split
         if (!binlogRecords.isEmpty()) {
             for (SourceRecord binlog : binlogRecords) {
@@ -169,10 +178,10 @@ public class RecordUtils {
                                             binlog.key(),
                                             binlog.valueSchema(),
                                             envelope.read(after, source, fetchTs));
-                            snapshotRecords.put(key, record);
+                            snapshotRecordsWithKey.put(key, record);
                             break;
                         case DELETE:
-                            snapshotRecords.remove(key);
+                            snapshotRecordsWithKey.remove(key);
                             break;
                         case READ:
                             throw new IllegalStateException(
@@ -188,7 +197,13 @@ public class RecordUtils {
 
         final List<SourceRecord> normalizedRecords = new ArrayList<>();
         normalizedRecords.add(lowWatermarkEvent);
-        normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
+        if (!snapshotRecordsWithoutKey.isEmpty()) {
+            // for table without key, there is no need for binlog upsert
+            // because highWatermark equals to lowWatermark
+            normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithoutKey));
+        } else {
+            normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithKey.values()));
+        }
         normalizedRecords.add(highWatermarkEvent);
 
         return normalizedRecords;