You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/30 06:05:15 UTC
[inlong] branch master updated: [INLONG-7697][Sort] Reduce the memory usage of JM when split table chunks (#7726)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ecff1b430 [INLONG-7697][Sort] Reduce the memory usage of JM when split table chunks (#7726)
ecff1b430 is described below
commit ecff1b4307df5345b5f08c55d56c5ad897c444ad
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Thu Mar 30 14:05:09 2023 +0800
[INLONG-7697][Sort] Reduce the memory usage of JM when split table chunks (#7726)
---
.../source/assigners/MySqlHybridSplitAssigner.java | 6 +-
.../assigners/MySqlSnapshotSplitAssigner.java | 61 +++++++++++------
.../state/PendingSplitsStateSerializer.java | 80 +++++++++++++++++-----
.../state/SnapshotPendingSplitsState.java | 24 ++++---
.../source/split/MySqlSchemalessSnapshotSplit.java | 67 ++++++++++++++++++
.../cdc/mysql/source/split/MySqlSnapshotSplit.java | 7 ++
.../sort/cdc/mysql/source/split/MySqlSplit.java | 3 +-
.../mysql/source/split/MySqlSplitSerializer.java | 4 +-
licenses/inlong-sort-connectors/LICENSE | 1 +
9 files changed, 200 insertions(+), 53 deletions(-)
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
index ccf82d3ae..8c7bc4ecc 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
@@ -24,7 +24,7 @@ import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
-import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,7 +186,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
// --------------------------------------------------------------------------------------------
private MySqlBinlogSplit createBinlogSplit() {
- final List<MySqlSnapshotSplit> assignedSnapshotSplit =
+ final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
snapshotSplitAssigner.getAssignedSplits().values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
@@ -196,7 +196,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
BinlogOffset minBinlogOffset = null;
- for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
+ for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index d270fa792..fb85c8c30 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.cdc.mysql.source.assigners;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -29,6 +30,7 @@ import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
import org.slf4j.Logger;
@@ -65,14 +67,14 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private static final Logger LOG = LoggerFactory.getLogger(MySqlSnapshotSplitAssigner.class);
private final List<TableId> alreadyProcessedTables;
- private final List<MySqlSnapshotSplit> remainingSplits;
- private final Map<String, MySqlSnapshotSplit> assignedSplits;
+ private final List<MySqlSchemalessSnapshotSplit> remainingSplits;
+ private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
private final Map<String, BinlogOffset> splitFinishedOffsets;
private final MySqlSourceConfig sourceConfig;
private final int currentParallelism;
private final List<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;
-
+ private final Map<TableId, TableChanges.TableChange> tableSchemas;
private AssignerStatus assignerStatus;
private ChunkSplitter chunkSplitter;
private boolean isTableIdCaseSensitive;
@@ -98,7 +100,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
AssignerStatus.INITIAL_ASSIGNING,
remainingTables,
isTableIdCaseSensitive,
- true);
+ true,
+ new HashMap<>());
}
public MySqlSnapshotSplitAssigner(
@@ -115,25 +118,28 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
checkpoint.getSnapshotAssignerStatus(),
checkpoint.getRemainingTables(),
checkpoint.isTableIdCaseSensitive(),
- checkpoint.isRemainingTablesCheckpointed());
+ checkpoint.isRemainingTablesCheckpointed(),
+ checkpoint.getTableSchemas());
}
private MySqlSnapshotSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
List<TableId> alreadyProcessedTables,
- List<MySqlSnapshotSplit> remainingSplits,
- Map<String, MySqlSnapshotSplit> assignedSplits,
+ List<MySqlSchemalessSnapshotSplit> remainingSplits,
+ Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
Map<String, BinlogOffset> splitFinishedOffsets,
AssignerStatus assignerStatus,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
- boolean isRemainingTablesCheckpointed) {
+ boolean isRemainingTablesCheckpointed,
+ Map<TableId, TableChanges.TableChange> tableSchemas) {
this.sourceConfig = sourceConfig;
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
this.assignedSplits = assignedSplits;
+ this.tableSchemas = tableSchemas;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
this.remainingTables = new CopyOnWriteArrayList<>(remainingTables);
@@ -206,14 +212,23 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
executor.submit(
() -> {
try {
- Iterator<TableId> iterator = remainingTables.iterator();
- while (iterator.hasNext()) {
- TableId nextTable = iterator.next();
+
+ for (TableId nextTable : remainingTables) {
// split the given table into chunks (snapshot splits)
Collection<MySqlSnapshotSplit> splits =
chunkSplitter.generateSplits(nextTable);
+
+ if (!splits.isEmpty()) {
+ tableSchemas.putAll(new HashMap<>(
+ splits.iterator().next().getTableSchemas()));
+ }
+
+ final List<MySqlSchemalessSnapshotSplit> schemaLessSnapshotSplits =
+ splits.stream()
+ .map(MySqlSnapshotSplit::toSchemaLessSnapshotSplit)
+ .collect(Collectors.toList());
synchronized (lock) {
- remainingSplits.addAll(splits);
+ remainingSplits.addAll(schemaLessSnapshotSplits);
remainingTables.remove(nextTable);
lock.notify();
}
@@ -230,12 +245,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
synchronized (lock) {
if (!remainingSplits.isEmpty()) {
// return remaining splits firstly
- Iterator<MySqlSnapshotSplit> iterator = remainingSplits.iterator();
- MySqlSnapshotSplit split = iterator.next();
+ Iterator<MySqlSchemalessSnapshotSplit> iterator = remainingSplits.iterator();
+ MySqlSchemalessSnapshotSplit split = iterator.next();
remainingSplits.remove(split);
assignedSplits.put(split.splitId(), split);
addAlreadyProcessedTablesIfNotExists(split.getTableId());
- return Optional.of(split);
+ return Optional.of(
+ split.toMySqlSnapshotSplit(tableSchemas.get(split.getTableId())));
} else if (!remainingTables.isEmpty()) {
try {
// wait for the asynchronous split to complete
@@ -265,12 +281,12 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
throw new FlinkRuntimeException(
"The assigner is not ready to offer finished split information, this should not be called");
}
- final List<MySqlSnapshotSplit> assignedSnapshotSplit =
+ final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
assignedSplits.values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
- for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
+ for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) {
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
@@ -305,7 +321,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
@Override
public void addSplits(Collection<MySqlSplit> splits) {
for (MySqlSplit split : splits) {
- remainingSplits.add(split.asSnapshotSplit());
+ tableSchemas.putAll(split.asSnapshotSplit().getTableSchemas());
+ remainingSplits.add(split.asSnapshotSplit().toSchemaLessSnapshotSplit());
// we should remove the add-backed splits from the assigned list,
// because they are failed
assignedSplits.remove(split.splitId());
@@ -324,7 +341,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
assignerStatus,
remainingTables,
isTableIdCaseSensitive,
- true);
+ true,
+ tableSchemas);
// we need a complete checkpoint before mark this assigner to be finished, to wait for all
// records of snapshot splits are completely processed
if (checkpointIdToFinish == null
@@ -392,7 +410,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
return remainingTables.isEmpty() && remainingSplits.isEmpty();
}
- public Map<String, MySqlSnapshotSplit> getAssignedSplits() {
+ public Map<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
return assignedSplits;
}
@@ -402,6 +420,9 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
return splitFinishedOffsets;
}
+ public Map<TableId, TableChanges.TableChange> getTableSchemas() {
+ return tableSchemas;
+ }
/**
* Returns whether all splits are finished which means no more splits and all assigned splits
* are finished.
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java
index 7bd20baf2..8d45ca8f5 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java
@@ -18,11 +18,14 @@
package org.apache.inlong.sort.cdc.mysql.source.assigners.state;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.inlong.sort.cdc.mysql.source.assigners.AssignerStatus;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
@@ -33,6 +36,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.inlong.sort.cdc.mysql.source.assigners.AssignerStatus.fromStatusCode;
+import static org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitSerializer.readTableSchemas;
+import static org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitSerializer.writeTableSchemas;
import static org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.readBinlogPosition;
import static org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.writeBinlogPosition;
@@ -42,7 +48,7 @@ import static org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.writ
*/
public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<PendingSplitsState> {
- private static final int VERSION = 3;
+ private static final int VERSION = 4;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
@@ -101,7 +107,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
return deserializeLegacyPendingSplitsState(serialized);
case 3:
case 4:
- return deserializePendingSplitsState(serialized);
+ return deserializePendingSplitsState(version, serialized);
default:
throw new IOException("Unknown version: " + version);
}
@@ -124,14 +130,14 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
}
}
- public PendingSplitsState deserializePendingSplitsState(byte[] serialized) throws IOException {
+ public PendingSplitsState deserializePendingSplitsState(int version, byte[] serialized) throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
final int splitVersion = in.readInt();
final int stateFlag = in.readInt();
if (stateFlag == SNAPSHOT_PENDING_SPLITS_STATE_FLAG) {
- return deserializeSnapshotPendingSplitsState(splitVersion, in);
+ return deserializeSnapshotPendingSplitsState(version, splitVersion, in);
} else if (stateFlag == HYBRID_PENDING_SPLITS_STATE_FLAG) {
- return deserializeHybridPendingSplitsState(splitVersion, in);
+ return deserializeHybridPendingSplitsState(version, splitVersion, in);
} else if (stateFlag == BINLOG_PENDING_SPLITS_STATE_FLAG) {
return deserializeBinlogPendingSplitsState(in);
} else {
@@ -153,6 +159,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
out.writeInt(state.getSnapshotAssignerStatus().getStatusCode());
writeTableIds(state.getRemainingTables(), out);
out.writeBoolean(state.isTableIdCaseSensitive());
+ writeTableSchemas(state.getTableSchemas(), out);
}
private void serializeHybridPendingSplitsState(
@@ -176,6 +183,23 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
List<MySqlSnapshotSplit> remainingSplits = readMySqlSnapshotSplits(splitVersion, in);
Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
readAssignedSnapshotSplits(splitVersion, in);
+
+ final List<MySqlSchemalessSnapshotSplit> remainingSchemaLessSplits = new ArrayList<>();
+ final Map<String, MySqlSchemalessSnapshotSplit> assignedSchemaLessSnapshotSplits =
+ new HashMap<>();
+ final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
+ remainingSplits.forEach(
+ split -> {
+ tableSchemas.putAll(split.getTableSchemas());
+ remainingSchemaLessSplits.add(split.toSchemaLessSnapshotSplit());
+ });
+ assignedSnapshotSplits
+ .forEach((key, value) -> {
+ tableSchemas.putAll(value.getTableSchemas());
+ assignedSchemaLessSnapshotSplits.put(
+ key, value.toSchemaLessSnapshotSplit());
+ });
+
Map<String, BinlogOffset> finishedOffsets = readFinishedOffsets(splitVersion, in);
AssignerStatus assignerStatus;
boolean isAssignerFinished = in.readBoolean();
@@ -187,13 +211,13 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
return new SnapshotPendingSplitsState(
alreadyProcessedTables,
- remainingSplits,
- assignedSnapshotSplits,
+ remainingSchemaLessSplits,
+ assignedSchemaLessSnapshotSplits,
finishedOffsets,
assignerStatus,
new ArrayList<>(),
false,
- false);
+ false, tableSchemas);
}
private HybridPendingSplitsState deserializeLegacyHybridPendingSplitsState(
@@ -204,7 +228,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
return new HybridPendingSplitsState(snapshotPendingSplitsState, isBinlogSplitAssigned);
}
- private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
+ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(int version,
int splitVersion, DataInputDeserializer in) throws IOException {
List<TableId> alreadyProcessedTables = readTableIds(in);
List<MySqlSnapshotSplit> remainingSplits = readMySqlSnapshotSplits(splitVersion, in);
@@ -212,33 +236,51 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
readAssignedSnapshotSplits(splitVersion, in);
Map<String, BinlogOffset> finishedOffsets = readFinishedOffsets(splitVersion, in);
AssignerStatus assignerStatus;
- if (splitVersion < 4) {
+ if (version >= 3) {
+ assignerStatus = fromStatusCode(in.readInt());
+ } else {
boolean isAssignerFinished = in.readBoolean();
if (isAssignerFinished) {
assignerStatus = AssignerStatus.INITIAL_ASSIGNING_FINISHED;
} else {
assignerStatus = AssignerStatus.INITIAL_ASSIGNING;
}
- } else {
- assignerStatus = AssignerStatus.fromStatusCode(in.readInt());
}
List<TableId> remainingTableIds = readTableIds(in);
boolean isTableIdCaseSensitive = in.readBoolean();
+ final List<MySqlSchemalessSnapshotSplit> remainingSchemaLessSplits = new ArrayList<>();
+ final Map<String, MySqlSchemalessSnapshotSplit> assignedSchemaLessSnapshotSplits =
+ new HashMap<>();
+ final Map<TableId, TableChange> tableSchemas = new HashMap<>();
+ remainingSplits.forEach(
+ split -> {
+ tableSchemas.putAll(split.getTableSchemas());
+ remainingSchemaLessSplits.add(split.toSchemaLessSnapshotSplit());
+ });
+ assignedSnapshotSplits
+ .forEach((key, value) -> {
+ tableSchemas.putAll(value.getTableSchemas());
+ assignedSchemaLessSnapshotSplits.put(
+ key, value.toSchemaLessSnapshotSplit());
+ });
+ if (version >= 4) {
+ tableSchemas.putAll(readTableSchemas(splitVersion, in));
+ }
return new SnapshotPendingSplitsState(
alreadyProcessedTables,
- remainingSplits,
- assignedSnapshotSplits,
+ remainingSchemaLessSplits,
+ assignedSchemaLessSnapshotSplits,
finishedOffsets,
assignerStatus,
remainingTableIds,
isTableIdCaseSensitive,
- true);
+ true, tableSchemas);
}
private HybridPendingSplitsState deserializeHybridPendingSplitsState(
- int splitVersion, DataInputDeserializer in) throws IOException {
+ int version, int splitVersion, DataInputDeserializer in) throws IOException {
SnapshotPendingSplitsState snapshotPendingSplitsState =
- deserializeSnapshotPendingSplitsState(splitVersion, in);
+ deserializeSnapshotPendingSplitsState(version, splitVersion, in);
boolean isBinlogSplitAssigned = in.readBoolean();
return new HybridPendingSplitsState(snapshotPendingSplitsState, isBinlogSplitAssigned);
}
@@ -275,11 +317,11 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
}
private void writeAssignedSnapshotSplits(
- Map<String, MySqlSnapshotSplit> assignedSplits, DataOutputSerializer out)
+ Map<String, MySqlSchemalessSnapshotSplit> assignedSplits, DataOutputSerializer out)
throws IOException {
final int size = assignedSplits.size();
out.writeInt(size);
- for (Map.Entry<String, MySqlSnapshotSplit> entry : assignedSplits.entrySet()) {
+ for (Map.Entry<String, MySqlSchemalessSnapshotSplit> entry : assignedSplits.entrySet()) {
out.writeUTF(entry.getKey());
byte[] splitBytes = splitSerializer.serialize(entry.getValue());
out.writeInt(splitBytes.length);
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java
index d7d3a27b7..5091b7e69 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java
@@ -18,9 +18,10 @@
package org.apache.inlong.sort.cdc.mysql.source.assigners.state;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges.TableChange;
import org.apache.inlong.sort.cdc.mysql.source.assigners.AssignerStatus;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
-import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSchemalessSnapshotSplit;
import java.util.List;
import java.util.Map;
@@ -45,13 +46,13 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
/**
* The splits in the checkpoint.
*/
- private final List<MySqlSnapshotSplit> remainingSplits;
+ private final List<MySqlSchemalessSnapshotSplit> remainingSplits;
/**
* The snapshot splits that the {@link MySqlSourceEnumerator} has assigned to {@link
* MySqlSplitReader}s.
*/
- private final Map<String, MySqlSnapshotSplit> assignedSplits;
+ private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
/**
* The offsets of finished (snapshot) splits that the {@link MySqlSourceEnumerator} has received
@@ -74,15 +75,17 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
*/
private final boolean isRemainingTablesCheckpointed;
+ private final Map<TableId, TableChange> tableSchemas;
+
public SnapshotPendingSplitsState(
List<TableId> alreadyProcessedTables,
- List<MySqlSnapshotSplit> remainingSplits,
- Map<String, MySqlSnapshotSplit> assignedSplits,
+ List<MySqlSchemalessSnapshotSplit> remainingSplits,
+ Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
Map<String, BinlogOffset> splitFinishedOffsets,
AssignerStatus assignerStatus,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
- boolean isRemainingTablesCheckpointed) {
+ boolean isRemainingTablesCheckpointed, Map<TableId, TableChange> tableSchemas) {
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
@@ -91,17 +94,22 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
this.remainingTables = remainingTables;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
+ this.tableSchemas = tableSchemas;
+ }
+
+ public Map<TableId, TableChange> getTableSchemas() {
+ return tableSchemas;
}
public List<TableId> getAlreadyProcessedTables() {
return alreadyProcessedTables;
}
- public List<MySqlSnapshotSplit> getRemainingSplits() {
+ public List<MySqlSchemalessSnapshotSplit> getRemainingSplits() {
return remainingSplits;
}
- public Map<String, MySqlSnapshotSplit> getAssignedSplits() {
+ public Map<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
return assignedSplits;
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java
new file mode 100644
index 000000000..60aa4eeaa
--- /dev/null
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.split;
+
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+
+/**
+ * The MySqlSnapshotSplit without schema information to reduce the memory usage.
+ * ref {@link com.ververica.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit}
+ */
+public class MySqlSchemalessSnapshotSplit extends MySqlSnapshotSplit {
+
+ public MySqlSchemalessSnapshotSplit(
+ TableId tableId,
+ String splitId,
+ RowType splitKeyType,
+ Object[] splitStart,
+ Object[] splitEnd,
+ BinlogOffset highWatermark) {
+ super(
+ tableId,
+ splitId,
+ splitKeyType,
+ splitStart,
+ splitEnd,
+ highWatermark,
+ new HashMap<>(1));
+ }
+
+ /**
+ * Converts current {@link MySqlSchemalessSnapshotSplit} to {@link com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit} with
+ * given table schema information.
+ */
+ public final MySqlSnapshotSplit toMySqlSnapshotSplit(
+ TableChange tableSchema) {
+ Map<TableId, TableChange> tableSchemas = new HashMap<>();
+ tableSchemas.put(getTableId(), tableSchema);
+ return new MySqlSnapshotSplit(
+ getTableId(),
+ splitId(),
+ getSplitKeyType(),
+ getSplitStart(),
+ getSplitEnd(),
+ getHighWatermark(),
+ tableSchemas);
+ }
+}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
index 195407840..fa9faf5f6 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
@@ -157,4 +157,11 @@ public class MySqlSnapshotSplit extends MySqlSplit {
+ highWatermark
+ '}';
}
+
+ /** Casts this split into a {@link MySqlSchemalessSnapshotSplit}. */
+ public final MySqlSchemalessSnapshotSplit toSchemaLessSnapshotSplit() {
+ return new MySqlSchemalessSnapshotSplit(
+ tableId, splitId, splitKeyType, splitStart, splitEnd, highWatermark);
+ }
+
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
index 5dfc88bd0..a3369b5a1 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
@@ -43,7 +43,8 @@ public abstract class MySqlSplit implements SourceSplit {
/** Checks whether this split is a snapshot split. */
public final boolean isSnapshotSplit() {
- return getClass() == MySqlSnapshotSplit.class;
+ return getClass() == MySqlSnapshotSplit.class
+ || getClass() == MySqlSchemalessSnapshotSplit.class;
}
/** Checks whether this split is a binlog split. */
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
index 6bb625779..93c3b4353 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
@@ -58,7 +58,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
private static final int BINLOG_SPLIT_FLAG = 2;
private static final int METRIC_SPLIT_FLAG = 3;
- private static void writeTableSchemas(
+ public static void writeTableSchemas(
Map<TableId, TableChange> tableSchemas, DataOutputSerializer out) throws IOException {
FlinkJsonTableChangeSerializer jsonSerializer = new FlinkJsonTableChangeSerializer();
DocumentWriter documentWriter = DocumentWriter.defaultWriter();
@@ -74,7 +74,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
}
}
- private static Map<TableId, TableChange> readTableSchemas(int version, DataInputDeserializer in)
+ public static Map<TableId, TableChange> readTableSchemas(int version, DataInputDeserializer in)
throws IOException {
DocumentReader documentReader = DocumentReader.defaultReader();
Map<TableId, TableChange> tableSchemas = new HashMap<>();
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 69998097e..0157061a0 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -345,6 +345,7 @@
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/TemporalConversions.java
inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
+ inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java
inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReaderContext.java
inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTypeUtils.java
inlong-sort/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/StatementUtils.java