You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 11:43:57 UTC
[inlong] 05/05: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit b96d4ed4aa21f917ae2c49833e3a2925c295c3f5
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Thu Jan 5 19:19:02 2023 +0800
[INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)
Co-authored-by: stingpeng <st...@tencent.com>
---
.../sort/cdc/mysql/source/utils/RecordUtils.java | 31 ++++++++++++++++------
1 file changed, 23 insertions(+), 8 deletions(-)
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index ef7ef4ca9..6944bd795 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -88,7 +88,8 @@ public class RecordUtils {
List<SourceRecord> sourceRecords,
SchemaNameAdjuster nameAdjuster) {
List<SourceRecord> normalizedRecords = new ArrayList<>();
- Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
+ Map<Struct, SourceRecord> snapshotRecordsWithKey = new HashMap<>();
+ List<SourceRecord> snapshotRecordsWithoutKey = new ArrayList<>();
List<SourceRecord> binlogRecords = new ArrayList<>();
if (!sourceRecords.isEmpty()) {
@@ -103,7 +104,11 @@ public class RecordUtils {
for (; i < sourceRecords.size(); i++) {
SourceRecord sourceRecord = sourceRecords.get(i);
if (!isHighWatermarkEvent(sourceRecord)) {
- snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord);
+ if (sourceRecord.key() == null) {
+ snapshotRecordsWithoutKey.add(sourceRecord);
+ } else {
+ snapshotRecordsWithKey.put((Struct) sourceRecord.key(), sourceRecord);
+ }
} else {
highWatermark = sourceRecord;
i++;
@@ -130,8 +135,11 @@ public class RecordUtils {
String.format(
"The last record should be high watermark signal event, but is %s",
highWatermark));
+
normalizedRecords =
- upsertBinlog(lowWatermark, highWatermark, snapshotRecords, binlogRecords);
+ upsertBinlog(lowWatermark, highWatermark, snapshotRecordsWithKey,
+ binlogRecords, snapshotRecordsWithoutKey);
+
}
return normalizedRecords;
}
@@ -139,8 +147,9 @@ public class RecordUtils {
private static List<SourceRecord> upsertBinlog(
SourceRecord lowWatermarkEvent,
SourceRecord highWatermarkEvent,
- Map<Struct, SourceRecord> snapshotRecords,
- List<SourceRecord> binlogRecords) {
+ Map<Struct, SourceRecord> snapshotRecordsWithKey,
+ List<SourceRecord> binlogRecords,
+ List<SourceRecord> snapshotRecordsWithoutKey) {
// upsert binlog events to snapshot events of split
if (!binlogRecords.isEmpty()) {
for (SourceRecord binlog : binlogRecords) {
@@ -169,10 +178,10 @@ public class RecordUtils {
binlog.key(),
binlog.valueSchema(),
envelope.read(after, source, fetchTs));
- snapshotRecords.put(key, record);
+ snapshotRecordsWithKey.put(key, record);
break;
case DELETE:
- snapshotRecords.remove(key);
+ snapshotRecordsWithKey.remove(key);
break;
case READ:
throw new IllegalStateException(
@@ -188,7 +197,13 @@ public class RecordUtils {
final List<SourceRecord> normalizedRecords = new ArrayList<>();
normalizedRecords.add(lowWatermarkEvent);
- normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
+ if (!snapshotRecordsWithoutKey.isEmpty()) {
+ // for table without key, there is no need for binlog upsert
+ // because highWatermark equals to lowWatermark
+ normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithoutKey));
+ } else {
+ normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithKey.values()));
+ }
normalizedRecords.add(highWatermarkEvent);
return normalizedRecords;