You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/30 06:05:15 UTC

[inlong] branch master updated: [INLONG-7697][Sort] Reduce the memory usage of JM when split table chunks (#7726)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ecff1b430 [INLONG-7697][Sort] Reduce the memory usage of JM when split table chunks (#7726)
ecff1b430 is described below

commit ecff1b4307df5345b5f08c55d56c5ad897c444ad
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Thu Mar 30 14:05:09 2023 +0800

    [INLONG-7697][Sort] Reduce the memory usage of JM when split table chunks (#7726)
---
 .../source/assigners/MySqlHybridSplitAssigner.java |  6 +-
 .../assigners/MySqlSnapshotSplitAssigner.java      | 61 +++++++++++------
 .../state/PendingSplitsStateSerializer.java        | 80 +++++++++++++++++-----
 .../state/SnapshotPendingSplitsState.java          | 24 ++++---
 .../source/split/MySqlSchemalessSnapshotSplit.java | 67 ++++++++++++++++++
 .../cdc/mysql/source/split/MySqlSnapshotSplit.java |  7 ++
 .../sort/cdc/mysql/source/split/MySqlSplit.java    |  3 +-
 .../mysql/source/split/MySqlSplitSerializer.java   |  4 +-
 licenses/inlong-sort-connectors/LICENSE            |  1 +
 9 files changed, 200 insertions(+), 53 deletions(-)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
index ccf82d3ae..8c7bc4ecc 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
@@ -24,7 +24,7 @@ import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
 import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
-import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSchemalessSnapshotSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -186,7 +186,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
     // --------------------------------------------------------------------------------------------
 
     private MySqlBinlogSplit createBinlogSplit() {
-        final List<MySqlSnapshotSplit> assignedSnapshotSplit =
+        final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
                 snapshotSplitAssigner.getAssignedSplits().values().stream()
                         .sorted(Comparator.comparing(MySqlSplit::splitId))
                         .collect(Collectors.toList());
@@ -196,7 +196,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
         final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
 
         BinlogOffset minBinlogOffset = null;
-        for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
+        for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) {
             // find the min binlog offset
             BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
             if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index d270fa792..fb85c8c30 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.cdc.mysql.source.assigners;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -29,6 +30,7 @@ import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
 import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSchemalessSnapshotSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
 import org.slf4j.Logger;
@@ -65,14 +67,14 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
     private static final Logger LOG = LoggerFactory.getLogger(MySqlSnapshotSplitAssigner.class);
 
     private final List<TableId> alreadyProcessedTables;
-    private final List<MySqlSnapshotSplit> remainingSplits;
-    private final Map<String, MySqlSnapshotSplit> assignedSplits;
+    private final List<MySqlSchemalessSnapshotSplit> remainingSplits;
+    private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
     private final Map<String, BinlogOffset> splitFinishedOffsets;
     private final MySqlSourceConfig sourceConfig;
     private final int currentParallelism;
     private final List<TableId> remainingTables;
     private final boolean isRemainingTablesCheckpointed;
-
+    private final Map<TableId, TableChanges.TableChange> tableSchemas;
     private AssignerStatus assignerStatus;
     private ChunkSplitter chunkSplitter;
     private boolean isTableIdCaseSensitive;
@@ -98,7 +100,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
                 AssignerStatus.INITIAL_ASSIGNING,
                 remainingTables,
                 isTableIdCaseSensitive,
-                true);
+                true,
+                new HashMap<>());
     }
 
     public MySqlSnapshotSplitAssigner(
@@ -115,25 +118,28 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
                 checkpoint.getSnapshotAssignerStatus(),
                 checkpoint.getRemainingTables(),
                 checkpoint.isTableIdCaseSensitive(),
-                checkpoint.isRemainingTablesCheckpointed());
+                checkpoint.isRemainingTablesCheckpointed(),
+                checkpoint.getTableSchemas());
     }
 
     private MySqlSnapshotSplitAssigner(
             MySqlSourceConfig sourceConfig,
             int currentParallelism,
             List<TableId> alreadyProcessedTables,
-            List<MySqlSnapshotSplit> remainingSplits,
-            Map<String, MySqlSnapshotSplit> assignedSplits,
+            List<MySqlSchemalessSnapshotSplit> remainingSplits,
+            Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
             Map<String, BinlogOffset> splitFinishedOffsets,
             AssignerStatus assignerStatus,
             List<TableId> remainingTables,
             boolean isTableIdCaseSensitive,
-            boolean isRemainingTablesCheckpointed) {
+            boolean isRemainingTablesCheckpointed,
+            Map<TableId, TableChanges.TableChange> tableSchemas) {
         this.sourceConfig = sourceConfig;
         this.currentParallelism = currentParallelism;
         this.alreadyProcessedTables = alreadyProcessedTables;
         this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
         this.assignedSplits = assignedSplits;
+        this.tableSchemas = tableSchemas;
         this.splitFinishedOffsets = splitFinishedOffsets;
         this.assignerStatus = assignerStatus;
         this.remainingTables = new CopyOnWriteArrayList<>(remainingTables);
@@ -206,14 +212,23 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
             executor.submit(
                     () -> {
                         try {
-                            Iterator<TableId> iterator = remainingTables.iterator();
-                            while (iterator.hasNext()) {
-                                TableId nextTable = iterator.next();
+
+                            for (TableId nextTable : remainingTables) {
                                 // split the given table into chunks (snapshot splits)
                                 Collection<MySqlSnapshotSplit> splits =
                                         chunkSplitter.generateSplits(nextTable);
+
+                                if (!splits.isEmpty()) {
+                                    tableSchemas.putAll(new HashMap<>(
+                                            splits.iterator().next().getTableSchemas()));
+                                }
+
+                                final List<MySqlSchemalessSnapshotSplit> schemaLessSnapshotSplits =
+                                        splits.stream()
+                                                .map(MySqlSnapshotSplit::toSchemaLessSnapshotSplit)
+                                                .collect(Collectors.toList());
                                 synchronized (lock) {
-                                    remainingSplits.addAll(splits);
+                                    remainingSplits.addAll(schemaLessSnapshotSplits);
                                     remainingTables.remove(nextTable);
                                     lock.notify();
                                 }
@@ -230,12 +245,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
         synchronized (lock) {
             if (!remainingSplits.isEmpty()) {
                 // return remaining splits firstly
-                Iterator<MySqlSnapshotSplit> iterator = remainingSplits.iterator();
-                MySqlSnapshotSplit split = iterator.next();
+                Iterator<MySqlSchemalessSnapshotSplit> iterator = remainingSplits.iterator();
+                MySqlSchemalessSnapshotSplit split = iterator.next();
                 remainingSplits.remove(split);
                 assignedSplits.put(split.splitId(), split);
                 addAlreadyProcessedTablesIfNotExists(split.getTableId());
-                return Optional.of(split);
+                return Optional.of(
+                        split.toMySqlSnapshotSplit(tableSchemas.get(split.getTableId())));
             } else if (!remainingTables.isEmpty()) {
                 try {
                     // wait for the asynchronous split to complete
@@ -265,12 +281,12 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
             throw new FlinkRuntimeException(
                     "The assigner is not ready to offer finished split information, this should not be called");
         }
-        final List<MySqlSnapshotSplit> assignedSnapshotSplit =
+        final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
                 assignedSplits.values().stream()
                         .sorted(Comparator.comparing(MySqlSplit::splitId))
                         .collect(Collectors.toList());
         List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
-        for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
+        for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) {
             BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
             finishedSnapshotSplitInfos.add(
                     new FinishedSnapshotSplitInfo(
@@ -305,7 +321,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
     @Override
     public void addSplits(Collection<MySqlSplit> splits) {
         for (MySqlSplit split : splits) {
-            remainingSplits.add(split.asSnapshotSplit());
+            tableSchemas.putAll(split.asSnapshotSplit().getTableSchemas());
+            remainingSplits.add(split.asSnapshotSplit().toSchemaLessSnapshotSplit());
             // we should remove the add-backed splits from the assigned list,
             // because they are failed
             assignedSplits.remove(split.splitId());
@@ -324,7 +341,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
                         assignerStatus,
                         remainingTables,
                         isTableIdCaseSensitive,
-                        true);
+                        true,
+                        tableSchemas);
         // we need a complete checkpoint before mark this assigner to be finished, to wait for all
         // records of snapshot splits are completely processed
         if (checkpointIdToFinish == null
@@ -392,7 +410,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
         return remainingTables.isEmpty() && remainingSplits.isEmpty();
     }
 
-    public Map<String, MySqlSnapshotSplit> getAssignedSplits() {
+    public Map<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
         return assignedSplits;
     }
 
@@ -402,6 +420,9 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
         return splitFinishedOffsets;
     }
 
+    public Map<TableId, TableChanges.TableChange> getTableSchemas() {
+        return tableSchemas;
+    }
     /**
      * Returns whether all splits are finished which means no more splits and all assigned splits
      * are finished.
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java
index 7bd20baf2..8d45ca8f5 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java
@@ -18,11 +18,14 @@
 package org.apache.inlong.sort.cdc.mysql.source.assigners.state;
 
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.inlong.sort.cdc.mysql.source.assigners.AssignerStatus;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSchemalessSnapshotSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
 
@@ -33,6 +36,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.inlong.sort.cdc.mysql.source.assigners.AssignerStatus.fromStatusCode;
+import static org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitSerializer.readTableSchemas;
+import static org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitSerializer.writeTableSchemas;
 import static org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.readBinlogPosition;
 import static org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.writeBinlogPosition;
 
@@ -42,7 +48,7 @@ import static org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.writ
  */
 public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<PendingSplitsState> {
 
-    private static final int VERSION = 3;
+    private static final int VERSION = 4;
     private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
             ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
 
@@ -101,7 +107,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
                 return deserializeLegacyPendingSplitsState(serialized);
             case 3:
             case 4:
-                return deserializePendingSplitsState(serialized);
+                return deserializePendingSplitsState(version, serialized);
             default:
                 throw new IOException("Unknown version: " + version);
         }
@@ -124,14 +130,14 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
         }
     }
 
-    public PendingSplitsState deserializePendingSplitsState(byte[] serialized) throws IOException {
+    public PendingSplitsState deserializePendingSplitsState(int version, byte[] serialized) throws IOException {
         final DataInputDeserializer in = new DataInputDeserializer(serialized);
         final int splitVersion = in.readInt();
         final int stateFlag = in.readInt();
         if (stateFlag == SNAPSHOT_PENDING_SPLITS_STATE_FLAG) {
-            return deserializeSnapshotPendingSplitsState(splitVersion, in);
+            return deserializeSnapshotPendingSplitsState(version, splitVersion, in);
         } else if (stateFlag == HYBRID_PENDING_SPLITS_STATE_FLAG) {
-            return deserializeHybridPendingSplitsState(splitVersion, in);
+            return deserializeHybridPendingSplitsState(version, splitVersion, in);
         } else if (stateFlag == BINLOG_PENDING_SPLITS_STATE_FLAG) {
             return deserializeBinlogPendingSplitsState(in);
         } else {
@@ -153,6 +159,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
         out.writeInt(state.getSnapshotAssignerStatus().getStatusCode());
         writeTableIds(state.getRemainingTables(), out);
         out.writeBoolean(state.isTableIdCaseSensitive());
+        writeTableSchemas(state.getTableSchemas(), out);
     }
 
     private void serializeHybridPendingSplitsState(
@@ -176,6 +183,23 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
         List<MySqlSnapshotSplit> remainingSplits = readMySqlSnapshotSplits(splitVersion, in);
         Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
                 readAssignedSnapshotSplits(splitVersion, in);
+
+        final List<MySqlSchemalessSnapshotSplit> remainingSchemaLessSplits = new ArrayList<>();
+        final Map<String, MySqlSchemalessSnapshotSplit> assignedSchemaLessSnapshotSplits =
+                new HashMap<>();
+        final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
+        remainingSplits.forEach(
+                split -> {
+                    tableSchemas.putAll(split.getTableSchemas());
+                    remainingSchemaLessSplits.add(split.toSchemaLessSnapshotSplit());
+                });
+        assignedSnapshotSplits
+                .forEach((key, value) -> {
+                    tableSchemas.putAll(value.getTableSchemas());
+                    assignedSchemaLessSnapshotSplits.put(
+                            key, value.toSchemaLessSnapshotSplit());
+                });
+
         Map<String, BinlogOffset> finishedOffsets = readFinishedOffsets(splitVersion, in);
         AssignerStatus assignerStatus;
         boolean isAssignerFinished = in.readBoolean();
@@ -187,13 +211,13 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
 
         return new SnapshotPendingSplitsState(
                 alreadyProcessedTables,
-                remainingSplits,
-                assignedSnapshotSplits,
+                remainingSchemaLessSplits,
+                assignedSchemaLessSnapshotSplits,
                 finishedOffsets,
                 assignerStatus,
                 new ArrayList<>(),
                 false,
-                false);
+                false, tableSchemas);
     }
 
     private HybridPendingSplitsState deserializeLegacyHybridPendingSplitsState(
@@ -204,7 +228,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
         return new HybridPendingSplitsState(snapshotPendingSplitsState, isBinlogSplitAssigned);
     }
 
-    private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
+    private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(int version,
             int splitVersion, DataInputDeserializer in) throws IOException {
         List<TableId> alreadyProcessedTables = readTableIds(in);
         List<MySqlSnapshotSplit> remainingSplits = readMySqlSnapshotSplits(splitVersion, in);
@@ -212,33 +236,51 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
                 readAssignedSnapshotSplits(splitVersion, in);
         Map<String, BinlogOffset> finishedOffsets = readFinishedOffsets(splitVersion, in);
         AssignerStatus assignerStatus;
-        if (splitVersion < 4) {
+        if (version >= 3) {
+            assignerStatus = fromStatusCode(in.readInt());
+        } else {
             boolean isAssignerFinished = in.readBoolean();
             if (isAssignerFinished) {
                 assignerStatus = AssignerStatus.INITIAL_ASSIGNING_FINISHED;
             } else {
                 assignerStatus = AssignerStatus.INITIAL_ASSIGNING;
             }
-        } else {
-            assignerStatus = AssignerStatus.fromStatusCode(in.readInt());
         }
         List<TableId> remainingTableIds = readTableIds(in);
         boolean isTableIdCaseSensitive = in.readBoolean();
+        final List<MySqlSchemalessSnapshotSplit> remainingSchemaLessSplits = new ArrayList<>();
+        final Map<String, MySqlSchemalessSnapshotSplit> assignedSchemaLessSnapshotSplits =
+                new HashMap<>();
+        final Map<TableId, TableChange> tableSchemas = new HashMap<>();
+        remainingSplits.forEach(
+                split -> {
+                    tableSchemas.putAll(split.getTableSchemas());
+                    remainingSchemaLessSplits.add(split.toSchemaLessSnapshotSplit());
+                });
+        assignedSnapshotSplits
+                .forEach((key, value) -> {
+                    tableSchemas.putAll(value.getTableSchemas());
+                    assignedSchemaLessSnapshotSplits.put(
+                            key, value.toSchemaLessSnapshotSplit());
+                });
+        if (version >= 4) {
+            tableSchemas.putAll(readTableSchemas(splitVersion, in));
+        }
         return new SnapshotPendingSplitsState(
                 alreadyProcessedTables,
-                remainingSplits,
-                assignedSnapshotSplits,
+                remainingSchemaLessSplits,
+                assignedSchemaLessSnapshotSplits,
                 finishedOffsets,
                 assignerStatus,
                 remainingTableIds,
                 isTableIdCaseSensitive,
-                true);
+                true, tableSchemas);
     }
 
     private HybridPendingSplitsState deserializeHybridPendingSplitsState(
-            int splitVersion, DataInputDeserializer in) throws IOException {
+            int version, int splitVersion, DataInputDeserializer in) throws IOException {
         SnapshotPendingSplitsState snapshotPendingSplitsState =
-                deserializeSnapshotPendingSplitsState(splitVersion, in);
+                deserializeSnapshotPendingSplitsState(version, splitVersion, in);
         boolean isBinlogSplitAssigned = in.readBoolean();
         return new HybridPendingSplitsState(snapshotPendingSplitsState, isBinlogSplitAssigned);
     }
@@ -275,11 +317,11 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
     }
 
     private void writeAssignedSnapshotSplits(
-            Map<String, MySqlSnapshotSplit> assignedSplits, DataOutputSerializer out)
+            Map<String, MySqlSchemalessSnapshotSplit> assignedSplits, DataOutputSerializer out)
             throws IOException {
         final int size = assignedSplits.size();
         out.writeInt(size);
-        for (Map.Entry<String, MySqlSnapshotSplit> entry : assignedSplits.entrySet()) {
+        for (Map.Entry<String, MySqlSchemalessSnapshotSplit> entry : assignedSplits.entrySet()) {
             out.writeUTF(entry.getKey());
             byte[] splitBytes = splitSerializer.serialize(entry.getValue());
             out.writeInt(splitBytes.length);
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java
index d7d3a27b7..5091b7e69 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java
@@ -18,9 +18,10 @@
 package org.apache.inlong.sort.cdc.mysql.source.assigners.state;
 
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges.TableChange;
 import org.apache.inlong.sort.cdc.mysql.source.assigners.AssignerStatus;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
-import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSchemalessSnapshotSplit;
 
 import java.util.List;
 import java.util.Map;
@@ -45,13 +46,13 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
     /**
      * The splits in the checkpoint.
      */
-    private final List<MySqlSnapshotSplit> remainingSplits;
+    private final List<MySqlSchemalessSnapshotSplit> remainingSplits;
 
     /**
      * The snapshot splits that the {@link MySqlSourceEnumerator} has assigned to {@link
      * MySqlSplitReader}s.
      */
-    private final Map<String, MySqlSnapshotSplit> assignedSplits;
+    private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
 
     /**
      * The offsets of finished (snapshot) splits that the {@link MySqlSourceEnumerator} has received
@@ -74,15 +75,17 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
      */
     private final boolean isRemainingTablesCheckpointed;
 
+    private final Map<TableId, TableChange> tableSchemas;
+
     public SnapshotPendingSplitsState(
             List<TableId> alreadyProcessedTables,
-            List<MySqlSnapshotSplit> remainingSplits,
-            Map<String, MySqlSnapshotSplit> assignedSplits,
+            List<MySqlSchemalessSnapshotSplit> remainingSplits,
+            Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
             Map<String, BinlogOffset> splitFinishedOffsets,
             AssignerStatus assignerStatus,
             List<TableId> remainingTables,
             boolean isTableIdCaseSensitive,
-            boolean isRemainingTablesCheckpointed) {
+            boolean isRemainingTablesCheckpointed, Map<TableId, TableChange> tableSchemas) {
         this.alreadyProcessedTables = alreadyProcessedTables;
         this.remainingSplits = remainingSplits;
         this.assignedSplits = assignedSplits;
@@ -91,17 +94,22 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
         this.remainingTables = remainingTables;
         this.isTableIdCaseSensitive = isTableIdCaseSensitive;
         this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
+        this.tableSchemas = tableSchemas;
+    }
+
+    public Map<TableId, TableChange> getTableSchemas() {
+        return tableSchemas;
     }
 
     public List<TableId> getAlreadyProcessedTables() {
         return alreadyProcessedTables;
     }
 
-    public List<MySqlSnapshotSplit> getRemainingSplits() {
+    public List<MySqlSchemalessSnapshotSplit> getRemainingSplits() {
         return remainingSplits;
     }
 
-    public Map<String, MySqlSnapshotSplit> getAssignedSplits() {
+    public Map<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
         return assignedSplits;
     }
 
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java
new file mode 100644
index 000000000..60aa4eeaa
--- /dev/null
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.split;
+
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+
+/**
+ * The MySqlSnapshotSplit without schema information to reduce the memory usage.
+ * ref {@link com.ververica.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit}
+ */
+public class MySqlSchemalessSnapshotSplit extends MySqlSnapshotSplit {
+
+    public MySqlSchemalessSnapshotSplit(
+            TableId tableId,
+            String splitId,
+            RowType splitKeyType,
+            Object[] splitStart,
+            Object[] splitEnd,
+            BinlogOffset highWatermark) {
+        super(
+                tableId,
+                splitId,
+                splitKeyType,
+                splitStart,
+                splitEnd,
+                highWatermark,
+                new HashMap<>(1));
+    }
+
+    /**
+     * Converts current {@link MySqlSchemalessSnapshotSplit} to {@link com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit} with
+     * given table schema information.
+     */
+    public final MySqlSnapshotSplit toMySqlSnapshotSplit(
+            TableChange tableSchema) {
+        Map<TableId, TableChange> tableSchemas = new HashMap<>();
+        tableSchemas.put(getTableId(), tableSchema);
+        return new MySqlSnapshotSplit(
+                getTableId(),
+                splitId(),
+                getSplitKeyType(),
+                getSplitStart(),
+                getSplitEnd(),
+                getHighWatermark(),
+                tableSchemas);
+    }
+}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
index 195407840..fa9faf5f6 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
@@ -157,4 +157,11 @@ public class MySqlSnapshotSplit extends MySqlSplit {
                 + highWatermark
                 + '}';
     }
+
+    /** Casts this split into a {@link MySqlSchemalessSnapshotSplit}. */
+    public final MySqlSchemalessSnapshotSplit toSchemaLessSnapshotSplit() {
+        return new MySqlSchemalessSnapshotSplit(
+                tableId, splitId, splitKeyType, splitStart, splitEnd, highWatermark);
+    }
+
 }
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
index 5dfc88bd0..a3369b5a1 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
@@ -43,7 +43,8 @@ public abstract class MySqlSplit implements SourceSplit {
 
     /** Checks whether this split is a snapshot split. */
     public final boolean isSnapshotSplit() {
-        return getClass() == MySqlSnapshotSplit.class;
+        return getClass() == MySqlSnapshotSplit.class
+                || getClass() == MySqlSchemalessSnapshotSplit.class;
     }
 
     /** Checks whether this split is a binlog split. */
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
index 6bb625779..93c3b4353 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
@@ -58,7 +58,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
     private static final int BINLOG_SPLIT_FLAG = 2;
     private static final int METRIC_SPLIT_FLAG = 3;
 
-    private static void writeTableSchemas(
+    public static void writeTableSchemas(
             Map<TableId, TableChange> tableSchemas, DataOutputSerializer out) throws IOException {
         FlinkJsonTableChangeSerializer jsonSerializer = new FlinkJsonTableChangeSerializer();
         DocumentWriter documentWriter = DocumentWriter.defaultWriter();
@@ -74,7 +74,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
         }
     }
 
-    private static Map<TableId, TableChange> readTableSchemas(int version, DataInputDeserializer in)
+    public static Map<TableId, TableChange> readTableSchemas(int version, DataInputDeserializer in)
             throws IOException {
         DocumentReader documentReader = DocumentReader.defaultReader();
         Map<TableId, TableChange> tableSchemas = new HashMap<>();
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 69998097e..0157061a0 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -345,6 +345,7 @@
       inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
       inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/TemporalConversions.java
       inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
+      inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java
       inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReaderContext.java
       inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTypeUtils.java
       inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/StatementUtils.java