You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/26 11:44:12 UTC

[incubator-inlong] branch master updated: [INLONG-3953][Sort] Add MySQL dynamic table implementation - modified from Flink CDC (#3955)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 119e14583 [INLONG-3953][Sort] Add MySQL dynamic table implementation - modified from Flink CDC (#3955)
119e14583 is described below

commit 119e145837d8b1c5af5487a112a62f5fc40c90d3
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Tue Apr 26 19:44:07 2022 +0800

    [INLONG-3953][Sort] Add MySQL dynamic table implementation - modified from Flink CDC (#3955)
    
    * [INLONG-3953][Sort] add mysql table schema, modified from flinkCDC
    
    * [INLONG-3953][Sort] fix checkstyle
    
    * [INLONG-3953][Sort] add hikariCP dependency
---
 inlong-sort/sort-single-tenant/pom.xml             |   4 +
 .../debezium/DebeziumDeserializationSchema.java    |  12 +-
 .../flink/cdc/debezium/DebeziumSourceFunction.java |  33 +-
 .../JsonDebeziumDeserializationSchema.java         |  98 ------
 .../StringDebeziumDeserializationSchema.java       |  49 ---
 .../history/FlinkJsonTableChangeSerializer.java    | 207 ------------
 .../debezium/internal/DebeziumChangeConsumer.java  | 103 ------
 .../debezium/internal/DebeziumChangeFetcher.java   | 309 -----------------
 .../cdc/debezium/internal/DebeziumOffset.java      |  64 ----
 .../internal/DebeziumOffsetSerializer.java         |  41 ---
 .../debezium/internal/FlinkDatabaseHistory.java    | 116 -------
 .../internal/FlinkDatabaseSchemaHistory.java       | 199 -----------
 .../debezium/internal/FlinkOffsetBackingStore.java | 201 -----------
 .../flink/cdc/debezium/internal/Handover.java      | 194 -----------
 .../flink/cdc/debezium/internal/SchemaRecord.java  |  95 ------
 .../cdc/debezium/utils/DatabaseHistoryUtil.java    |   2 +-
 .../flink/cdc/debezium/utils/ObjectUtils.java      | 104 ++++++
 .../flink/cdc/mysql/DebeziumUtils.java             | 197 +++++++++++
 .../singletenant/flink/cdc/mysql/MySqlSource.java  | 240 +++++++++++++
 .../flink/cdc/mysql/MySqlValidator.java            | 161 +++++++++
 .../cdc/mysql/SeekBinlogToTimestampFilter.java     |  93 ++++++
 .../flink/cdc/mysql/config/MySqlSourceConfig.java  | 207 ++++++++++++
 .../flink/cdc/mysql/config/MySqlSourceOptions.java | 237 +++++++++++++
 .../flink/cdc/mysql/config/ServerIdRange.java      | 112 +++++++
 .../cdc/mysql/connection/ConnectionPoolId.java     |  57 ++++
 .../cdc/mysql/connection/ConnectionPools.java      |  35 ++
 .../mysql/connection/JdbcConnectionFactory.java    |  76 +++++
 .../cdc/mysql/connection/JdbcConnectionPools.java  |  56 ++++
 .../mysql/connection/PooledDataSourceFactory.java  |  89 +++++
 .../flink/cdc/mysql/table/JdbcUrlUtils.java        |  52 +++
 .../MySqlDeserializationConverterFactory.java      | 153 +++++++++
 .../cdc/mysql/table/MySqlReadableMetadata.java     | 355 ++++++++++++++++++++
 .../flink/cdc/mysql/table/MySqlTableSource.java    | 370 +++++++++++++++++++++
 .../cdc/mysql/table/MySqlTableSourceFactory.java   | 308 +++++++++++++++++
 .../flink/cdc/mysql/table/StartupMode.java         |  36 ++
 .../flink/cdc/mysql/table/StartupOptions.java      | 129 +++++++
 36 files changed, 3092 insertions(+), 1702 deletions(-)

diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 29eb8ead2..a4e5ad628 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -210,6 +210,10 @@
             <artifactId>debezium-core</artifactId>
             <version>${debezium-core.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP</artifactId>
+        </dependency>
 
     </dependencies>
 
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java
index a23fe10e3..c47d2220a 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java
@@ -21,6 +21,7 @@ package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
 import io.debezium.relational.history.TableChanges.TableChange;
 import java.io.Serializable;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.util.Collector;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -32,11 +33,14 @@ import org.apache.kafka.connect.source.SourceRecord;
  * @param <T> The type created by the deserialization schema.
  */
 @PublicEvolving
-public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T>,
+    com.ververica.cdc.debezium.DebeziumDeserializationSchema<T> {
 
-    /** Deserialize the Debezium record, it is represented in Kafka {@link SourceRecord}. */
-    void deserialize(SourceRecord record, Collector<T> out) throws Exception;
-    
     void deserialize(SourceRecord record, Collector<T> out, TableChange tableChange) throws Exception;
 
+    @Override
+    void deserialize(SourceRecord sourceRecord, Collector<T> collector) throws Exception;
+
+    @Override
+    TypeInformation<T> getProducedType();
 }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java
index 321b9ac14..60ef2bc4f 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java
@@ -21,15 +21,15 @@ package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
 import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
 
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumChangeConsumer;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumChangeFetcher;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumOffset;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumOffsetSerializer;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkDatabaseHistory;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkOffsetBackingStore;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.Handover;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.SchemaRecord;
+import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.ververica.cdc.debezium.internal.Handover;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
 import io.debezium.document.DocumentReader;
 import io.debezium.document.DocumentWriter;
 import io.debezium.embedded.Connect;
@@ -165,12 +165,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
     /** Accessor for state in the operator state backend. */
     private transient ListState<byte[]> offsetState;
 
-    /**
-     * State to store the history records, i.e. schema changes.
-     *
-     * @see FlinkDatabaseHistory
-     * @see FlinkDatabaseSchemaHistory
-     */
     private transient ListState<String> schemaRecordsState;
 
     // ---------------------------------------------------------------------------------------
@@ -179,10 +173,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
 
     private transient ExecutorService executor;
     private transient DebeziumEngine<?> engine;
-    /**
-     * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
-     * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
-     */
+
     private transient String engineInstanceName;
 
     /** Consume the events from the engine and commit the offset to the engine. */
@@ -393,9 +384,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                         Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
                         Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
         this.debeziumChangeFetcher =
-                new DebeziumChangeFetcher<>(
+                new DebeziumChangeFetcher<T>(
                         sourceContext,
-                        deserializer,
+                    deserializer,
                         restoredOffsetState == null, // DB snapshot phase if restore state is null
                         dbzHeartbeatPrefix,
                         handover);
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java
deleted file mode 100644
index 1539a3570..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
-
-import io.debezium.relational.history.TableChanges.TableChange;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.Collector;
-import org.apache.kafka.connect.json.JsonConverter;
-import org.apache.kafka.connect.json.JsonConverterConfig;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.storage.ConverterConfig;
-import org.apache.kafka.connect.storage.ConverterType;
-
-/**
- * A JSON format implementation of {@link DebeziumDeserializationSchema} which deserializes the
- * received {@link SourceRecord} to JSON String.
- */
-public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
-
-    private static final long serialVersionUID = 1L;
-
-    private transient JsonConverter jsonConverter;
-
-    /**
-     * Configuration whether to enable {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG} to include
-     * schema in messages.
-     */
-    private final Boolean includeSchema;
-
-    /** The custom configurations for {@link JsonConverter}. */
-    private Map<String, Object> customConverterConfigs;
-
-    public JsonDebeziumDeserializationSchema() {
-        this(false);
-    }
-
-    public JsonDebeziumDeserializationSchema(Boolean includeSchema) {
-        this.includeSchema = includeSchema;
-    }
-
-    public JsonDebeziumDeserializationSchema(
-            Boolean includeSchema, Map<String, Object> customConverterConfigs) {
-        this.includeSchema = includeSchema;
-        this.customConverterConfigs = customConverterConfigs;
-    }
-
-    @Override
-    public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
-        if (jsonConverter == null) {
-            initializeJsonConverter();
-        }
-        byte[] bytes =
-                jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
-        out.collect(new String(bytes));
-    }
-
-    @Override
-    public void deserialize(SourceRecord record, Collector<String> out, TableChange tableChange)
-        throws Exception {
-
-    }
-
-    /** Initialize {@link JsonConverter} with given configs. */
-    private void initializeJsonConverter() {
-        jsonConverter = new JsonConverter();
-        final HashMap<String, Object> configs = new HashMap<>(2);
-        configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
-        configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
-        if (customConverterConfigs != null) {
-            configs.putAll(customConverterConfigs);
-        }
-        jsonConverter.configure(configs);
-    }
-
-    @Override
-    public TypeInformation<String> getProducedType() {
-        return BasicTypeInfo.STRING_TYPE_INFO;
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java
deleted file mode 100644
index 1b7931abd..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
-
-import io.debezium.relational.history.TableChanges.TableChange;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.Collector;
-import org.apache.kafka.connect.source.SourceRecord;
-
-/**
- * A simple implementation of {@link DebeziumDeserializationSchema} which converts the received
- * {@link SourceRecord} into String.
- */
-public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
-    private static final long serialVersionUID = -3168848963265670603L;
-
-    @Override
-    public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
-        out.collect(record.toString());
-    }
-
-    @Override
-    public void deserialize(SourceRecord record, Collector<String> out, TableChange tableChange)
-        throws Exception {
-
-    }
-
-    @Override
-    public TypeInformation<String> getProducedType() {
-        return BasicTypeInfo.STRING_TYPE_INFO;
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java
deleted file mode 100644
index 856d7fa83..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.history;
-
-import io.debezium.document.Array;
-import io.debezium.document.Array.Entry;
-import io.debezium.document.Document;
-import io.debezium.document.Value;
-import io.debezium.relational.Column;
-import io.debezium.relational.ColumnEditor;
-import io.debezium.relational.Table;
-import io.debezium.relational.TableEditor;
-import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges;
-import io.debezium.relational.history.TableChanges.TableChange;
-import io.debezium.relational.history.TableChanges.TableChangeType;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/**
- * The serializer responsible for converting of {@link TableChanges} into a JSON format. Copied from
- * io.debezium.relational.history.JsonTableChangeSerializer, but add serialization/deserialization
- * for column's enumValues
- */
-public class FlinkJsonTableChangeSerializer implements TableChanges.TableChangesSerializer<Array> {
-
-    @Override
-    public Array serialize(TableChanges tableChanges) {
-        List<Value> values =
-                StreamSupport.stream(tableChanges.spliterator(), false)
-                        .map(this::toDocument)
-                        .map(Value::create)
-                        .collect(Collectors.toList());
-
-        return Array.create(values);
-    }
-
-    public Document toDocument(TableChange tableChange) {
-        Document document = Document.create();
-
-        document.setString("type", tableChange.getType().name());
-        document.setString("id", tableChange.getId().toDoubleQuotedString());
-        document.setDocument("table", toDocument(tableChange.getTable()));
-        return document;
-    }
-
-    private Document toDocument(Table table) {
-        Document document = Document.create();
-
-        document.set("defaultCharsetName", table.defaultCharsetName());
-        document.set("primaryKeyColumnNames", Array.create(table.primaryKeyColumnNames()));
-
-        List<Document> columns =
-                table.columns().stream().map(this::toDocument).collect(Collectors.toList());
-
-        document.setArray("columns", Array.create(columns));
-
-        return document;
-    }
-
-    private Document toDocument(Column column) {
-        Document document = Document.create();
-
-        document.setString("name", column.name());
-        document.setNumber("jdbcType", column.jdbcType());
-
-        if (column.nativeType() != Column.UNSET_INT_VALUE) {
-            document.setNumber("nativeType", column.nativeType());
-        }
-
-        document.setString("typeName", column.typeName());
-        document.setString("typeExpression", column.typeExpression());
-        document.setString("charsetName", column.charsetName());
-
-        if (column.length() != Column.UNSET_INT_VALUE) {
-            document.setNumber("length", column.length());
-        }
-
-        column.scale().ifPresent(s -> document.setNumber("scale", s));
-
-        document.setNumber("position", column.position());
-        document.setBoolean("optional", column.isOptional());
-        document.setBoolean("autoIncremented", column.isAutoIncremented());
-        document.setBoolean("generated", column.isGenerated());
-
-        // BEGIN FLINK MODIFICATION
-        document.setArray("enumValues", column.enumValues().toArray());
-        // END FLINK MODIFICATION
-
-        return document;
-    }
-
-    @Override
-    public TableChanges deserialize(Array array, boolean useCatalogBeforeSchema) {
-        TableChanges tableChanges = new TableChanges();
-
-        for (Entry entry : array) {
-            TableChange change =
-                    fromDocument(entry.getValue().asDocument(), useCatalogBeforeSchema);
-
-            if (change.getType() == TableChangeType.CREATE) {
-                tableChanges.create(change.getTable());
-            } else if (change.getType() == TableChangeType.ALTER) {
-                tableChanges.alter(change.getTable());
-            } else if (change.getType() == TableChangeType.DROP) {
-                tableChanges.drop(change.getTable());
-            }
-        }
-
-        return tableChanges;
-    }
-
-    private static Table fromDocument(TableId id, Document document) {
-        TableEditor editor =
-                Table.editor()
-                        .tableId(id)
-                        .setDefaultCharsetName(document.getString("defaultCharsetName"));
-
-        document.getArray("columns")
-                .streamValues()
-                .map(Value::asDocument)
-                .map(
-                        v -> {
-                            ColumnEditor columnEditor =
-                                    Column.editor()
-                                            .name(v.getString("name"))
-                                            .jdbcType(v.getInteger("jdbcType"));
-
-                            Integer nativeType = v.getInteger("nativeType");
-                            if (nativeType != null) {
-                                columnEditor.nativeType(nativeType);
-                            }
-
-                            columnEditor
-                                    .type(v.getString("typeName"), v.getString("typeExpression"))
-                                    .charsetName(v.getString("charsetName"));
-
-                            Integer length = v.getInteger("length");
-                            if (length != null) {
-                                columnEditor.length(length);
-                            }
-
-                            Integer scale = v.getInteger("scale");
-                            if (scale != null) {
-                                columnEditor.scale(scale);
-                            }
-
-                            columnEditor
-                                    .position(v.getInteger("position"))
-                                    .optional(v.getBoolean("optional"))
-                                    .autoIncremented(v.getBoolean("autoIncremented"))
-                                    .generated(v.getBoolean("generated"));
-
-                            // BEGIN FLINK MODIFICATION
-                            Array enumValues = v.getArray("enumValues");
-                            if (enumValues != null && !enumValues.isEmpty()) {
-                                columnEditor.enumValues(
-                                        enumValues
-                                                .streamValues()
-                                                .map(Value::asString)
-                                                .collect(Collectors.toList()));
-                            }
-                            // END FLINK MODIFICATION
-
-                            return columnEditor.create();
-                        })
-                .forEach(editor::addColumn);
-
-        editor.setPrimaryKeyNames(
-                document.getArray("primaryKeyColumnNames")
-                        .streamValues()
-                        .map(Value::asString)
-                        .collect(Collectors.toList()));
-
-        return editor.create();
-    }
-
-    public static TableChange fromDocument(Document document, boolean useCatalogBeforeSchema) {
-        TableChangeType type = TableChangeType.valueOf(document.getString("type"));
-        TableId id = TableId.parse(document.getString("id"), useCatalogBeforeSchema);
-        Table table = null;
-
-        if (type == TableChangeType.CREATE || type == TableChangeType.ALTER) {
-            table = fromDocument(id, document.getDocument("table"));
-        } else {
-            table = Table.editor().tableId(id).create();
-        }
-        return new TableChange(type, table);
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java
deleted file mode 100644
index bb31eadec..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import io.debezium.embedded.EmbeddedEngineChangeEvent;
-import io.debezium.engine.ChangeEvent;
-import io.debezium.engine.DebeziumEngine;
-import io.debezium.engine.DebeziumEngine.RecordCommitter;
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.annotation.Internal;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Consume debezium change events. */
-@Internal
-public class DebeziumChangeConsumer
-        implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
-    public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
-    public static final String LAST_COMMIT_LSN_KEY = "lsn_commit";
-    private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
-
-    private final Handover handover;
-    // keep the modification is visible to the source function
-    private volatile RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> currentCommitter;
-
-    public DebeziumChangeConsumer(Handover handover) {
-        this.handover = handover;
-    }
-
-    @Override
-    public void handleBatch(
-            List<ChangeEvent<SourceRecord, SourceRecord>> events,
-            RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
-        try {
-            currentCommitter = recordCommitter;
-            handover.produce(events);
-        } catch (Throwable e) {
-            // Hold this exception in handover and trigger the fetcher to exit
-            handover.reportError(e);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public void commitOffset(DebeziumOffset offset) throws InterruptedException {
-        // Although the committer is read/write by multi-thread, the committer will be not changed
-        // frequently.
-        if (currentCommitter == null) {
-            LOG.info(
-                    "commitOffset() called on Debezium change consumer which doesn't receive records yet.");
-            return;
-        }
-
-        // only the offset is used
-        SourceRecord recordWrapper =
-                new SourceRecord(
-                        offset.sourcePartition,
-                        adjustSourceOffset((Map<String, Object>) offset.sourceOffset),
-                        "DUMMY",
-                        Schema.BOOLEAN_SCHEMA,
-                        true);
-        EmbeddedEngineChangeEvent<SourceRecord, SourceRecord> changeEvent =
-                new EmbeddedEngineChangeEvent<>(null, recordWrapper, recordWrapper);
-        currentCommitter.markProcessed(changeEvent);
-        currentCommitter.markBatchFinished();
-    }
-
-    /**
-     * We have to adjust type of LSN values to Long, because it might be Integer after
-     * deserialization, however {@code
-     * io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(java.util.Map)}
-     * requires Long.
-     */
-    private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset) {
-        if (sourceOffset.containsKey(LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
-            String value = sourceOffset.get(LAST_COMPLETELY_PROCESSED_LSN_KEY).toString();
-            sourceOffset.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value));
-        }
-        if (sourceOffset.containsKey(LAST_COMMIT_LSN_KEY)) {
-            String value = sourceOffset.get(LAST_COMMIT_LSN_KEY).toString();
-            sourceOffset.put(LAST_COMMIT_LSN_KEY, Long.parseLong(value));
-        }
-        return sourceOffset;
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java
deleted file mode 100644
index ea44e7026..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
-import io.debezium.connector.SnapshotRecord;
-import io.debezium.data.Envelope;
-import io.debezium.engine.ChangeEvent;
-import io.debezium.engine.DebeziumEngine;
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Handler that convert change messages from {@link DebeziumEngine} to data in Flink. Considering
- * Debezium in different mode has different strategies to hold the lock, e.g. snapshot, the handler
- * also needs different strategy. In snapshot phase, the handler needs to hold the lock until the
- * snapshot finishes. But in non-snapshot phase, the handler only needs to hold the lock when
- * emitting the records.
- *
- * @param <T> The type of elements produced by the handler.
- */
-@Internal
-public class DebeziumChangeFetcher<T> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeFetcher.class);
-
-    private final SourceFunction.SourceContext<T> sourceContext;
-
-    /**
-     * The lock that guarantees that record emission and state updates are atomic, from the view of
-     * taking a checkpoint.
-     */
-    private final Object checkpointLock;
-
-    /** The schema to convert from Debezium's messages into Flink's objects. */
-    private final DebeziumDeserializationSchema<T> deserialization;
-
-    /** A collector to emit records in batch (bundle). */
-    private final DebeziumCollector debeziumCollector;
-
-    private final DebeziumOffset debeziumOffset;
-
-    private final DebeziumOffsetSerializer stateSerializer;
-
-    private final String heartbeatTopicPrefix;
-
-    private boolean isInDbSnapshotPhase;
-
-    private final Handover handover;
-
-    private volatile boolean isRunning = true;
-
-    // ---------------------------------------------------------------------------------------
-    // Metrics
-    // ---------------------------------------------------------------------------------------
-
-    /** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */
-    private volatile long messageTimestamp = 0L;
-
-    /** The last record processing time. */
-    private volatile long processTime = 0L;
-
-    /**
-     * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
-     * record fetched into the source operator.
-     */
-    private volatile long fetchDelay = 0L;
-
-    /**
-     * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
-     * source operator.
-     */
-    private volatile long emitDelay = 0L;
-
-    // ------------------------------------------------------------------------
-
-    public DebeziumChangeFetcher(
-            SourceFunction.SourceContext<T> sourceContext,
-            DebeziumDeserializationSchema<T> deserialization,
-            boolean isInDbSnapshotPhase,
-            String heartbeatTopicPrefix,
-            Handover handover) {
-        this.sourceContext = sourceContext;
-        this.checkpointLock = sourceContext.getCheckpointLock();
-        this.deserialization = deserialization;
-        this.isInDbSnapshotPhase = isInDbSnapshotPhase;
-        this.heartbeatTopicPrefix = heartbeatTopicPrefix;
-        this.debeziumCollector = new DebeziumCollector();
-        this.debeziumOffset = new DebeziumOffset();
-        this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
-        this.handover = handover;
-    }
-
-    /**
-     * Take a snapshot of the Debezium handler state.
-     *
-     * <p>Important: This method must be called under the checkpoint lock.
-     */
-    public byte[] snapshotCurrentState() throws Exception {
-        // this method assumes that the checkpoint lock is held
-        assert Thread.holdsLock(checkpointLock);
-        if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
-            return null;
-        }
-
-        return stateSerializer.serialize(debeziumOffset);
-    }
-
-    /**
-     * Process change messages from the {@link Handover} and collect the processed messages by
-     * {@link Collector}.
-     */
-    public void runFetchLoop() throws Exception {
-        try {
-            // begin snapshot database phase
-            if (isInDbSnapshotPhase) {
-                List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
-
-                synchronized (checkpointLock) {
-                    LOG.info(
-                            "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
-                    handleBatch(events);
-                    while (isRunning && isInDbSnapshotPhase) {
-                        handleBatch(handover.pollNext());
-                    }
-                }
-                LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
-            }
-
-            // begin streaming binlog phase
-            while (isRunning) {
-                // If the handover is closed or has errors, exit.
-                // If there is no streaming phase, the handover will be closed by the engine.
-                handleBatch(handover.pollNext());
-            }
-        } catch (Handover.ClosedException e) {
-            // ignore
-        }
-    }
-
-    public void close() {
-        isRunning = false;
-        handover.close();
-    }
-
-    // ---------------------------------------------------------------------------------------
-    // Metric getter
-    // ---------------------------------------------------------------------------------------
-
-    /**
-     * The metric indicates delay from data generation to entry into the system.
-     *
-     * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is
-     * unavailable.
-     */
-    public long getFetchDelay() {
-        return fetchDelay;
-    }
-
-    /**
-     * The metric indicates delay from data generation to leaving the source operator.
-     *
-     * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is
-     * unavailable.
-     */
-    public long getEmitDelay() {
-        return emitDelay;
-    }
-
-    public long getIdleTime() {
-        return System.currentTimeMillis() - processTime;
-    }
-
-    // ---------------------------------------------------------------------------------------
-    // Helper
-    // ---------------------------------------------------------------------------------------
-
-    private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents)
-            throws Exception {
-        if (CollectionUtils.isEmpty(changeEvents)) {
-            return;
-        }
-        this.processTime = System.currentTimeMillis();
-
-        for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
-            SourceRecord record = event.value();
-            updateMessageTimestamp(record);
-            fetchDelay = isInDbSnapshotPhase ? 0L : processTime - messageTimestamp;
-
-            if (isHeartbeatEvent(record)) {
-                // keep offset update
-                synchronized (checkpointLock) {
-                    debeziumOffset.setSourcePartition(record.sourcePartition());
-                    debeziumOffset.setSourceOffset(record.sourceOffset());
-                }
-                // drop heartbeat events
-                continue;
-            }
-
-            deserialization.deserialize(record, debeziumCollector);
-
-            if (!isSnapshotRecord(record)) {
-                LOG.debug("Snapshot phase finishes.");
-                isInDbSnapshotPhase = false;
-            }
-
-            // emit the actual records. this also updates offset state atomically
-            emitRecordsUnderCheckpointLock(
-                    debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
-        }
-    }
-
-    private void emitRecordsUnderCheckpointLock(
-            Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
-        // Emit the records. Use the checkpoint lock to guarantee
-        // atomicity of record emission and offset state update.
-        // The synchronized checkpointLock is reentrant. It's safe to sync again in snapshot mode.
-        synchronized (checkpointLock) {
-            T record;
-            while ((record = records.poll()) != null) {
-                emitDelay =
-                        isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - messageTimestamp;
-                sourceContext.collect(record);
-            }
-            // update offset to state
-            debeziumOffset.setSourcePartition(sourcePartition);
-            debeziumOffset.setSourceOffset(sourceOffset);
-        }
-    }
-
-    private void updateMessageTimestamp(SourceRecord record) {
-        Schema schema = record.valueSchema();
-        Struct value = (Struct) record.value();
-        if (schema.field(Envelope.FieldName.SOURCE) == null) {
-            return;
-        }
-
-        Struct source = value.getStruct(Envelope.FieldName.SOURCE);
-        if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
-            return;
-        }
-
-        Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
-        if (tsMs != null) {
-            this.messageTimestamp = tsMs;
-        }
-    }
-
-    private boolean isHeartbeatEvent(SourceRecord record) {
-        String topic = record.topic();
-        return topic != null && topic.startsWith(heartbeatTopicPrefix);
-    }
-
-    private boolean isSnapshotRecord(SourceRecord record) {
-        Struct value = (Struct) record.value();
-        if (value != null) {
-            Struct source = value.getStruct(Envelope.FieldName.SOURCE);
-            SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
-            // even if it is the last record of snapshot, i.e. SnapshotRecord.LAST
-            // we can still recover from checkpoint and continue to read the binlog,
-            // because the checkpoint contains binlog position
-            return SnapshotRecord.TRUE == snapshotRecord;
-        }
-        return false;
-    }
-
-    // ---------------------------------------------------------------------------------------
-
-    private class DebeziumCollector implements Collector<T> {
-
-        private final Queue<T> records = new ArrayDeque<>();
-
-        @Override
-        public void collect(T record) {
-            records.add(record);
-        }
-
-        @Override
-        public void close() {
-        }
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java
deleted file mode 100644
index a51aad8d4..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.flink.annotation.Internal;
-
-/**
- * The state that the Flink Debezium Consumer holds for each instance.
- *
- * <p>This class describes the most basic state that Debezium used for recovering based on Kafka
- * Connect mechanism. It includes a sourcePartition and sourceOffset.
- *
- * <p>The sourcePartition represents a single input sourcePartition that the record came from (e.g.
- * a filename, table name, or topic-partition). The sourceOffset represents a position in that
- * sourcePartition which can be used to resume consumption of data.
- *
- * <p>These values can have arbitrary structure and should be represented using
- * org.apache.kafka.connect.data objects (or primitive values). For example, a database connector
- * might specify the sourcePartition as a record containing { "db": "database_name", "table":
- * "table_name"} and the sourceOffset as a Long containing the timestamp of the row.
- */
-@Internal
-public class DebeziumOffset implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    public Map<String, ?> sourcePartition;
-    public Map<String, ?> sourceOffset;
-
-    public void setSourcePartition(Map<String, ?> sourcePartition) {
-        this.sourcePartition = sourcePartition;
-    }
-
-    public void setSourceOffset(Map<String, ?> sourceOffset) {
-        this.sourceOffset = sourceOffset;
-    }
-
-    @Override
-    public String toString() {
-        return "DebeziumOffset{"
-                + "sourcePartition="
-                + sourcePartition
-                + ", sourceOffset="
-                + sourceOffset
-                + '}';
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java
deleted file mode 100644
index 9f193a8d9..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import java.io.IOException;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-/** Serializer implementation for a {@link DebeziumOffset}. */
-@Internal
-public class DebeziumOffsetSerializer {
-    public static final DebeziumOffsetSerializer INSTANCE = new DebeziumOffsetSerializer();
-
-    public byte[] serialize(DebeziumOffset debeziumOffset) throws IOException {
-        // we currently use JSON serialization for simplification, as the state is very small.
-        // we can improve this in the future if needed
-        ObjectMapper objectMapper = new ObjectMapper();
-        return objectMapper.writeValueAsBytes(debeziumOffset);
-    }
-
-    public DebeziumOffset deserialize(byte[] bytes) throws IOException {
-        ObjectMapper objectMapper = new ObjectMapper();
-        return objectMapper.readValue(bytes, DebeziumOffset.class);
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java
deleted file mode 100644
index 77315db48..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.removeHistory;
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-
-import io.debezium.config.Configuration;
-import io.debezium.relational.history.AbstractDatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryException;
-import io.debezium.relational.history.DatabaseHistoryListener;
-import io.debezium.relational.history.HistoryRecord;
-import io.debezium.relational.history.HistoryRecordComparator;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Consumer;
-
-/**
- * Inspired from {@link io.debezium.relational.history.MemoryDatabaseHistory} but we will store the
- * HistoryRecords in Flink's state for persistence.
- *
- * <p>Note: This is not a clean solution because we depends on a global variable and all the history
- * records will be stored in state (grow infinitely). We may need to come up with a
- * FileSystemDatabaseHistory in the future to store history in HDFS.
- */
-public class FlinkDatabaseHistory extends AbstractDatabaseHistory {
-
-    public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";
-
-    private ConcurrentLinkedQueue<SchemaRecord> schemaRecords;
-    private String instanceName;
-
-    /** Gets the registered HistoryRecords under the given instance name. */
-    private ConcurrentLinkedQueue<SchemaRecord> getRegisteredHistoryRecord(String instanceName) {
-        Collection<SchemaRecord> historyRecords = retrieveHistory(instanceName);
-        return new ConcurrentLinkedQueue<>(historyRecords);
-    }
-
-    @Override
-    public void configure(
-            Configuration config,
-            HistoryRecordComparator comparator,
-            DatabaseHistoryListener listener,
-            boolean useCatalogBeforeSchema) {
-        super.configure(config, comparator, listener, useCatalogBeforeSchema);
-        this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
-        this.schemaRecords = getRegisteredHistoryRecord(instanceName);
-
-        // register the schema changes into state
-        // every change should be visible to the source function
-        registerHistory(instanceName, schemaRecords);
-    }
-
-    @Override
-    public void stop() {
-        super.stop();
-        removeHistory(instanceName);
-    }
-
-    @Override
-    protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
-        this.schemaRecords.add(new SchemaRecord(record));
-    }
-
-    @Override
-    protected void recoverRecords(Consumer<HistoryRecord> records) {
-        this.schemaRecords.stream().map(SchemaRecord::getHistoryRecord).forEach(records);
-    }
-
-    @Override
-    public boolean exists() {
-        return !schemaRecords.isEmpty();
-    }
-
-    @Override
-    public boolean storageExists() {
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "Flink Database History";
-    }
-
-    /**
-     * Determine whether the {@link FlinkDatabaseHistory} is compatible with the specified state.
-     */
-    public static boolean isCompatible(Collection<SchemaRecord> records) {
-        for (SchemaRecord record : records) {
-            // check the source/position/ddl is not null
-            if (!record.isHistoryRecord()) {
-                return false;
-            } else {
-                break;
-            }
-        }
-        return true;
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
deleted file mode 100644
index 1027b18ed..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.removeHistory;
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static io.debezium.relational.history.TableChanges.TableChange;
-
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
-import io.debezium.config.Configuration;
-import io.debezium.relational.TableId;
-import io.debezium.relational.Tables;
-import io.debezium.relational.ddl.DdlParser;
-import io.debezium.relational.history.DatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryException;
-import io.debezium.relational.history.DatabaseHistoryListener;
-import io.debezium.relational.history.HistoryRecord;
-import io.debezium.relational.history.HistoryRecordComparator;
-import io.debezium.relational.history.TableChanges;
-import io.debezium.schema.DatabaseSchema;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * The {@link FlinkDatabaseSchemaHistory} only stores the latest schema of the monitored tables.
- * When recovering from the checkpoint, it should apply all the tables to the {@link
- * DatabaseSchema}, which doesn't need to replay the history anymore.
- *
- * <p>Considering the data structure maintained in the {@link FlinkDatabaseSchemaHistory} is much
- * different from the {@link FlinkDatabaseHistory}, it's not compatible with the {@link
- * FlinkDatabaseHistory}. Because it only maintains the latest schema of the table rather than all
- * history DDLs, it's useful to prevent OOM when meet massive history DDLs.
- */
-public class FlinkDatabaseSchemaHistory implements DatabaseHistory {
-
-    public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";
-
-    private final FlinkJsonTableChangeSerializer tableChangesSerializer =
-            new FlinkJsonTableChangeSerializer();
-
-    private ConcurrentMap<TableId, SchemaRecord> latestTables;
-    private String instanceName;
-    private DatabaseHistoryListener listener;
-    private boolean storeOnlyMonitoredTablesDdl;
-    private boolean skipUnparseableDDL;
-    private boolean useCatalogBeforeSchema;
-
-    @Override
-    public void configure(
-            Configuration config,
-            HistoryRecordComparator comparator,
-            DatabaseHistoryListener listener,
-            boolean useCatalogBeforeSchema) {
-        this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
-        this.listener = listener;
-        this.storeOnlyMonitoredTablesDdl = config.getBoolean(STORE_ONLY_MONITORED_TABLES_DDL);
-        this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
-        this.useCatalogBeforeSchema = useCatalogBeforeSchema;
-
-        // recover
-        this.latestTables = new ConcurrentHashMap<>();
-        for (SchemaRecord schemaRecord : retrieveHistory(instanceName)) {
-            // validate here
-            TableChange tableChange =
-                    FlinkJsonTableChangeSerializer.fromDocument(
-                            schemaRecord.toDocument(), useCatalogBeforeSchema);
-            latestTables.put(tableChange.getId(), schemaRecord);
-        }
-        // register
-        registerHistory(instanceName, latestTables.values());
-    }
-
-    @Override
-    public void start() {
-        listener.started();
-    }
-
-    @Override
-    public void record(
-            Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl)
-            throws DatabaseHistoryException {
-        throw new UnsupportedOperationException(
-                String.format(
-                        "The %s cannot work with 'debezium.internal.implementation' = 'legacy',"
-                                + "please use %s",
-                        FlinkDatabaseSchemaHistory.class.getCanonicalName(),
-                        FlinkDatabaseHistory.class.getCanonicalName()));
-    }
-
-    @Override
-    public void record(
-            Map<String, ?> source,
-            Map<String, ?> position,
-            String databaseName,
-            String schemaName,
-            String ddl,
-            TableChanges changes)
-            throws DatabaseHistoryException {
-        for (TableChanges.TableChange change : changes) {
-            switch (change.getType()) {
-                case CREATE:
-                case ALTER:
-                    latestTables.put(
-                            change.getId(),
-                            new SchemaRecord(tableChangesSerializer.toDocument(change)));
-                    break;
-                case DROP:
-                    latestTables.remove(change.getId());
-                    break;
-                default:
-                    // impossible
-                    throw new RuntimeException(
-                            String.format("Unknown change type: %s.", change.getType()));
-            }
-        }
-        listener.onChangeApplied(
-                new HistoryRecord(source, position, databaseName, schemaName, ddl, changes));
-    }
-
-    @Override
-    public void recover(
-            Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
-        listener.recoveryStarted();
-        for (SchemaRecord record : latestTables.values()) {
-            TableChange tableChange =
-                    FlinkJsonTableChangeSerializer.fromDocument(
-                            record.getTableChangeDoc(), useCatalogBeforeSchema);
-            schema.overwriteTable(tableChange.getTable());
-        }
-        listener.recoveryStopped();
-    }
-
-    @Override
-    public void stop() {
-        if (instanceName != null) {
-            removeHistory(instanceName);
-        }
-        listener.stopped();
-    }
-
-    @Override
-    public boolean exists() {
-        return latestTables != null && !latestTables.isEmpty();
-    }
-
-    @Override
-    public boolean storageExists() {
-        return true;
-    }
-
-    @Override
-    public void initializeStorage() {
-        // do nothing
-    }
-
-    @Override
-    public boolean storeOnlyMonitoredTables() {
-        return storeOnlyMonitoredTablesDdl;
-    }
-
-    @Override
-    public boolean skipUnparseableDdlStatements() {
-        return skipUnparseableDDL;
-    }
-
-    /**
-     * Determine whether the {@link FlinkDatabaseSchemaHistory} is compatible with the specified
-     * state.
-     */
-    public static boolean isCompatible(Collection<SchemaRecord> records) {
-        for (SchemaRecord record : records) {
-            if (!record.isTableChangeRecord()) {
-                return false;
-            } else {
-                break;
-            }
-        }
-        return true;
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java
deleted file mode 100644
index d36516c29..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
-import io.debezium.embedded.EmbeddedEngine;
-import io.debezium.engine.DebeziumEngine;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.kafka.common.utils.ThreadUtils;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.json.JsonConverter;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.storage.OffsetStorageWriter;
-import org.apache.kafka.connect.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A implementation of {@link OffsetBackingStore} backed on Flink's state mechanism.
- *
- * <p>The {@link #OFFSET_STATE_VALUE} in the {@link WorkerConfig} is the raw position and offset
- * data in JSON format. It is set into the config when recovery from failover by {@link
- * DebeziumSourceFunction} before startup the {@link DebeziumEngine}. If it is not a restoration,
- * the {@link #OFFSET_STATE_VALUE} is empty. {@link DebeziumEngine} relies on the {@link
- * OffsetBackingStore} for failover recovery.
- *
- * @see DebeziumSourceFunction
- */
-public class FlinkOffsetBackingStore implements OffsetBackingStore {
-    private static final Logger LOG = LoggerFactory.getLogger(FlinkOffsetBackingStore.class);
-
-    public static final String OFFSET_STATE_VALUE = "offset.storage.flink.state.value";
-    public static final int FLUSH_TIMEOUT_SECONDS = 10;
-
-    protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
-    protected ExecutorService executor;
-
-    @Override
-    public void configure(WorkerConfig config) {
-        // eagerly initialize the executor, because OffsetStorageWriter will use it later
-        start();
-
-        Map<String, ?> conf = config.originals();
-        if (!conf.containsKey(OFFSET_STATE_VALUE)) {
-            // a normal startup from clean state, not need to initialize the offset
-            return;
-        }
-
-        String stateJson = (String) conf.get(OFFSET_STATE_VALUE);
-        DebeziumOffsetSerializer serializer = new DebeziumOffsetSerializer();
-        DebeziumOffset debeziumOffset;
-        try {
-            debeziumOffset = serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8));
-        } catch (IOException e) {
-            LOG.error("Can't deserialize debezium offset state from JSON: " + stateJson, e);
-            throw new RuntimeException(e);
-        }
-
-        final String engineName = (String) conf.get(EmbeddedEngine.ENGINE_NAME.name());
-        Converter keyConverter = new JsonConverter();
-        Converter valueConverter = new JsonConverter();
-        keyConverter.configure(config.originals(), true);
-        Map<String, Object> valueConfigs = new HashMap<>(conf);
-        valueConfigs.put("schemas.enable", false);
-        valueConverter.configure(valueConfigs, true);
-        OffsetStorageWriter offsetWriter =
-                new OffsetStorageWriter(
-                        this,
-                        // must use engineName as namespace to align with Debezium Engine
-                        // implementation
-                        engineName,
-                        keyConverter,
-                        valueConverter);
-
-        offsetWriter.offset(debeziumOffset.sourcePartition, debeziumOffset.sourceOffset);
-
-        // flush immediately
-        if (!offsetWriter.beginFlush()) {
-            // if nothing is needed to be flushed, there must be something wrong with the
-            // initialization
-            LOG.warn(
-                    "Initialize FlinkOffsetBackingStore from empty offset state, this shouldn't happen.");
-            return;
-        }
-
-        // trigger flushing
-        Future<Void> flushFuture =
-                offsetWriter.doFlush(
-                        (error, result) -> {
-                            if (error != null) {
-                                LOG.error("Failed to flush initial offset.", error);
-                            } else {
-                                LOG.debug("Successfully flush initial offset.");
-                            }
-                        });
-
-        // wait until flushing finished
-        try {
-            flushFuture.get(FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-            LOG.info(
-                    "Flush offsets successfully, partition: {}, offsets: {}",
-                    debeziumOffset.sourcePartition,
-                    debeziumOffset.sourceOffset);
-        } catch (InterruptedException e) {
-            LOG.warn("Flush offsets interrupted, cancelling.", e);
-            offsetWriter.cancelFlush();
-        } catch (ExecutionException e) {
-            LOG.error("Flush offsets threw an unexpected exception.", e);
-            offsetWriter.cancelFlush();
-        } catch (TimeoutException e) {
-            LOG.error("Timed out waiting to flush offsets to storage.", e);
-            offsetWriter.cancelFlush();
-        }
-    }
-
-    @Override
-    public void start() {
-        if (executor == null) {
-            executor =
-                    Executors.newFixedThreadPool(
-                            1,
-                            ThreadUtils.createThreadFactory(
-                                    this.getClass().getSimpleName() + "-%d", false));
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (executor != null) {
-            executor.shutdown();
-            // Best effort wait for any get() and set() tasks (and caller's callbacks) to complete.
-            try {
-                executor.awaitTermination(30, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-            if (!executor.shutdownNow().isEmpty()) {
-                throw new ConnectException(
-                        "Failed to stop FlinkOffsetBackingStore. Exiting without cleanly "
-                                + "shutting down pending tasks and/or callbacks.");
-            }
-            executor = null;
-        }
-    }
-
-    @Override
-    public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
-        return executor.submit(
-                () -> {
-                    Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
-                    for (ByteBuffer key : keys) {
-                        result.put(key, data.get(key));
-                    }
-                    return result;
-                });
-    }
-
-    @Override
-    public Future<Void> set(
-            final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
-        return executor.submit(
-                () -> {
-                    for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
-                        data.put(entry.getKey(), entry.getValue());
-                    }
-                    if (callback != null) {
-                        callback.onCompletion(null, null);
-                    }
-                    return null;
-                });
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java
deleted file mode 100644
index 9af2ff246..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import io.debezium.engine.ChangeEvent;
-import java.io.Closeable;
-import java.util.Collections;
-import java.util.List;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The Handover is a utility to hand over data (a buffer of records) and exception from a
- * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a "size one
- * blocking queue", with some extras around exception reporting, closing, and waking up thread
- * without {@link Thread#interrupt() interrupting} threads.
- *
- * <p>This class is used in the Flink Debezium Engine Consumer to hand over data and exceptions
- * between the thread that runs the DebeziumEngine class and the main thread.
- *
- * <p>The Handover can also be "closed", signalling from one thread to the other that it the thread
- * has terminated.
- */
-@ThreadSafe
-@Internal
-public class Handover implements Closeable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
-    private final Object lock = new Object();
-
-    @GuardedBy("lock")
-    private List<ChangeEvent<SourceRecord, SourceRecord>> next;
-
-    @GuardedBy("lock")
-    private Throwable error;
-
-    private boolean wakeupProducer;
-
-    /**
-     * Polls the next element from the Handover, possibly blocking until the next element is
-     * available. This method behaves similar to polling from a blocking queue.
-     *
-     * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then that
-     * exception is thrown rather than an element being returned.
-     *
-     * @return The next element (buffer of records, never null).
-     * @throws ClosedException Thrown if the Handover was {@link #close() closed}.
-     * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
-     */
-    public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception {
-        synchronized (lock) {
-            while (next == null && error == null) {
-                lock.wait();
-            }
-            List<ChangeEvent<SourceRecord, SourceRecord>> n = next;
-            if (n != null) {
-                next = null;
-                lock.notifyAll();
-                return n;
-            } else {
-                ExceptionUtils.rethrowException(error, error.getMessage());
-
-                // this statement cannot be reached since the above method always throws an
-                // exception this is only here to silence the compiler and any warnings
-                return Collections.emptyList();
-            }
-        }
-    }
-
-    /**
-     * Hands over an element from the producer. If the Handover already has an element that was not
-     * yet picked up by the consumer thread, this call blocks until the consumer picks up that
-     * previous element.
-     *
-     * <p>This behavior is similar to a "size one" blocking queue.
-     *
-     * @param element The next element to hand over.
-     * @throws InterruptedException Thrown, if the thread is interrupted while blocking for the
-     *     Handover to be empty.
-     */
-    public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element)
-            throws InterruptedException {
-
-        checkNotNull(element);
-
-        synchronized (lock) {
-            while (next != null && !wakeupProducer) {
-                lock.wait();
-            }
-
-            wakeupProducer = false;
-
-            // an error marks this as closed for the producer
-            if (error != null) {
-                ExceptionUtils.rethrow(error, error.getMessage());
-            } else {
-                // if there is no error, then this is open and can accept this element
-                next = element;
-                lock.notifyAll();
-            }
-        }
-    }
-
-    /**
-     * Reports an exception. The consumer will throw the given exception immediately, if it is
-     * currently blocked in the {@link #pollNext()} method, or the next time it calls that method.
-     *
-     * <p>After this method has been called, no call to either {@link #produce( List)} or {@link
-     * #pollNext()} will ever return regularly any more, but will always return exceptionally.
-     *
-     * <p>If another exception was already reported, this method does nothing.
-     *
-     * <p>For the producer, the Handover will appear as if it was {@link #close() closed}.
-     *
-     * @param t The exception to report.
-     */
-    public void reportError(Throwable t) {
-        checkNotNull(t);
-
-        synchronized (lock) {
-            LOG.error("Reporting error:", t);
-            // do not override the initial exception
-            if (error == null) {
-                error = t;
-            }
-            next = null;
-            lock.notifyAll();
-        }
-    }
-
-    /**
-     * Return whether there is an error.
-     *
-     * @return whether there is an error
-     */
-    public boolean hasError() {
-        return error != null;
-    }
-
-    /**
-     * Closes the handover. Both the {@link #produce(List)} method and the {@link #pollNext()} will
-     * throw a {@link ClosedException} on any currently blocking and future invocations.
-     *
-     * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method,
-     * that exception will not be overridden. The consumer thread will throw that exception upon
-     * calling {@link #pollNext()}, rather than the {@code ClosedException}.
-     */
-    @Override
-    public void close() {
-        synchronized (lock) {
-            next = null;
-            wakeupProducer = false;
-
-            if (error == null) {
-                error = new ClosedException();
-            }
-            lock.notifyAll();
-        }
-    }
-
-    // ------------------------------------------------------------------------
-
-    /**
-     * An exception thrown by the Handover in the {@link #pollNext()} or {@link #produce(List)}
-     * method, after the Handover was closed via {@link #close()}.
-     */
-    public static final class ClosedException extends Exception {
-
-        private static final long serialVersionUID = 1L;
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java
deleted file mode 100644
index bcc48ce52..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import io.debezium.document.Document;
-import io.debezium.document.DocumentWriter;
-import io.debezium.relational.history.HistoryRecord;
-import io.debezium.relational.history.TableChanges.TableChange;
-import java.io.IOException;
-import javax.annotation.Nullable;
-
-/**
- * The Record represents a schema change event, it contains either one {@link HistoryRecord} or
- * {@link TableChange}.
- *
- * <p>The {@link HistoryRecord} will be used by {@link FlinkDatabaseHistory} which keeps full
- * history of table change events for all tables, the {@link TableChange} will be used by {@link
- * FlinkDatabaseSchemaHistory} which keeps the latest table change for each table.
- */
-public class SchemaRecord {
-
-    @Nullable private final HistoryRecord historyRecord;
-
-    @Nullable private final Document tableChangeDoc;
-
-    public SchemaRecord(HistoryRecord historyRecord) {
-        this.historyRecord = historyRecord;
-        this.tableChangeDoc = null;
-    }
-
-    public SchemaRecord(Document document) {
-        if (isHistoryRecordDocument(document)) {
-            this.historyRecord = new HistoryRecord(document);
-            this.tableChangeDoc = null;
-        } else {
-            this.tableChangeDoc = document;
-            this.historyRecord = null;
-        }
-    }
-
-    @Nullable
-    public HistoryRecord getHistoryRecord() {
-        return historyRecord;
-    }
-
-    @Nullable
-    public Document getTableChangeDoc() {
-        return tableChangeDoc;
-    }
-
-    public boolean isHistoryRecord() {
-        return historyRecord != null;
-    }
-
-    public boolean isTableChangeRecord() {
-        return tableChangeDoc != null;
-    }
-
-    public Document toDocument() {
-        if (historyRecord != null) {
-            return historyRecord.document();
-        } else {
-            return tableChangeDoc;
-        }
-    }
-
-    @Override
-    public String toString() {
-        try {
-            return DocumentWriter.defaultWriter().write(toDocument());
-        } catch (IOException e) {
-            return super.toString();
-        }
-    }
-
-    private boolean isHistoryRecordDocument(Document document) {
-        return new HistoryRecord(document).isValid();
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java
index b00d12024..958638234 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java
@@ -18,8 +18,8 @@
 
 package org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils;
 
+import com.ververica.cdc.debezium.internal.SchemaRecord;
 import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.SchemaRecord;
 import io.debezium.relational.history.DatabaseHistory;
 import java.util.Collection;
 import java.util.Collections;
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/ObjectUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/ObjectUtils.java
new file mode 100644
index 000000000..5f4e2732f
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/ObjectUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/** Utilities for operation on {@link Object}. */
+public class ObjectUtils {
+
+    /**
+     * Returns a number {@code Object} whose value is {@code (number + augend)}, Note: This method
+     * does not consider number overflow because we don't want to change the object type.
+     */
+    public static Object plus(Object number, int augend) {
+        if (number instanceof Integer) {
+            return (int) number + augend;
+        } else if (number instanceof Long) {
+            return (long) number + augend;
+        } else if (number instanceof BigInteger) {
+            return ((BigInteger) number).add(BigInteger.valueOf(augend));
+        } else if (number instanceof BigDecimal) {
+            return ((BigDecimal) number).add(BigDecimal.valueOf(augend));
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported type %s for numeric plus.",
+                            number.getClass().getSimpleName()));
+        }
+    }
+
+    /** Returns the difference {@code BigDecimal} whose value is {@code (minuend - subtrahend)}. */
+    public static BigDecimal minus(Object minuend, Object subtrahend) {
+        if (!minuend.getClass().equals(subtrahend.getClass())) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Unsupported operand type, the minuend type %s is different with subtrahend type %s.",
+                            minuend.getClass().getSimpleName(),
+                            subtrahend.getClass().getSimpleName()));
+        }
+        if (minuend instanceof Integer) {
+            return BigDecimal.valueOf((int) minuend).subtract(BigDecimal.valueOf((int) subtrahend));
+        } else if (minuend instanceof Long) {
+            return BigDecimal.valueOf((long) minuend)
+                    .subtract(BigDecimal.valueOf((long) subtrahend));
+        } else if (minuend instanceof BigInteger) {
+            return new BigDecimal(
+                    ((BigInteger) minuend).subtract((BigInteger) subtrahend).toString());
+        } else if (minuend instanceof BigDecimal) {
+            return ((BigDecimal) minuend).subtract((BigDecimal) subtrahend);
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported type %s for numeric minus.",
+                            minuend.getClass().getSimpleName()));
+        }
+    }
+
+    /**
+     * Compares two comparable objects.
+     *
+     * @return The value {@code 0} if {@code num1} is equal to the {@code num2}; a value less than
+     *     {@code 0} if the {@code num1} is numerically less than the {@code num2}; and a value
+     *     greater than {@code 0} if the {@code num1} is numerically greater than the {@code num2}.
+     * @throws ClassCastException if the compared objects are not instance of {@link Comparable} or
+     *     not <i>mutually comparable</i> (for example, strings and integers).
+     */
+    @SuppressWarnings("unchecked")
+    public static int compare(Object obj1, Object obj2) {
+        if (obj1 instanceof Comparable && obj1.getClass().equals(obj2.getClass())) {
+            return ((Comparable) obj1).compareTo(obj2);
+        } else {
+            return obj1.toString().compareTo(obj2.toString());
+        }
+    }
+
+    /**
+     * Compares two Double numeric object.
+     *
+     * @return -1, 0, or 1 as this {@code arg1} is numerically less than, equal to, or greater than
+     *     {@code arg2}.
+     */
+    public static int doubleCompare(double arg1, double arg2) {
+        BigDecimal bigDecimal1 = BigDecimal.valueOf(arg1);
+        BigDecimal bigDecimal2 = BigDecimal.valueOf(arg2);
+        return bigDecimal1.compareTo(bigDecimal2);
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/DebeziumUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/DebeziumUtils.java
new file mode 100644
index 000000000..008bbdd51
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/DebeziumUtils.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDatabaseSchema;
+import io.debezium.connector.mysql.MySqlTopicSelector;
+import io.debezium.connector.mysql.MySqlValueConverters;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.jdbc.TemporalPrecisionMode;
+import io.debezium.relational.TableId;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.SchemaNameAdjuster;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection.JdbcConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities related to Debezium.
+ */
+public class DebeziumUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class);
+
+    /**
+     * Creates and opens a new {@link JdbcConnection} backing connection pool.
+     */
+    public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) {
+        JdbcConnection jdbc =
+                new JdbcConnection(
+                        sourceConfig.getDbzConfiguration(),
+                        new JdbcConnectionFactory(sourceConfig));
+        try {
+            jdbc.connect();
+        } catch (Exception e) {
+            LOG.error("Failed to open MySQL connection", e);
+            throw new FlinkRuntimeException(e);
+        }
+        return jdbc;
+    }
+
+    /**
+     * Creates a new {@link MySqlConnection}, but not open the connection.
+     */
+    public static MySqlConnection createMySqlConnection(MySqlSourceConfig sourceConfig) {
+        return new MySqlConnection(
+                new MySqlConnection.MySqlConnectionConfiguration(sourceConfig.getDbzConfiguration()));
+    }
+
+    /**
+     * Creates a new {@link MySqlConnection}, but not open the connection.
+     */
+    public static MySqlConnection createMySqlConnection(Configuration dbzConfiguration) {
+        return new MySqlConnection(
+                new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
+    }
+
+    /**
+     * Creates a new {@link BinaryLogClient} for consuming mysql binlog.
+     */
+    public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration) {
+        final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dbzConfiguration);
+        return new BinaryLogClient(
+                connectorConfig.hostname(),
+                connectorConfig.port(),
+                connectorConfig.username(),
+                connectorConfig.password());
+    }
+
+    /**
+     * Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas.
+     */
+    public static MySqlDatabaseSchema createMySqlDatabaseSchema(
+            MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) {
+        TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
+        SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
+        MySqlValueConverters valueConverters = getValueConverters(dbzMySqlConfig);
+        return new MySqlDatabaseSchema(
+                dbzMySqlConfig,
+                valueConverters,
+                topicSelector,
+                schemaNameAdjuster,
+                isTableIdCaseSensitive);
+    }
+
+    /**
+     * Fetch current binlog offsets in MySql Server.
+     */
+    public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
+        final String showMasterStmt = "SHOW MASTER STATUS";
+        try {
+            return jdbc.queryAndMap(
+                    showMasterStmt,
+                    rs -> {
+                        if (rs.next()) {
+                            final String binlogFilename = rs.getString(1);
+                            final long binlogPosition = rs.getLong(2);
+                            final String gtidSet =
+                                    rs.getMetaData().getColumnCount() > 4 ? rs.getString(5) : null;
+                            return new BinlogOffset(
+                                    binlogFilename, binlogPosition, 0L, 0, 0, gtidSet, null);
+                        } else {
+                            throw new FlinkRuntimeException(
+                                    "Cannot read the binlog filename and position via '"
+                                            + showMasterStmt
+                                            + "'. Make sure your server is correctly configured");
+                        }
+                    });
+        } catch (SQLException e) {
+            throw new FlinkRuntimeException(
+                    "Cannot read the binlog filename and position via '"
+                            + showMasterStmt
+                            + "'. Make sure your server is correctly configured",
+                    e);
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static MySqlValueConverters getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
+        TemporalPrecisionMode timePrecisionMode = dbzMySqlConfig.getTemporalPrecisionMode();
+        JdbcValueConverters.DecimalMode decimalMode = dbzMySqlConfig.getDecimalMode();
+        String bigIntUnsignedHandlingModeStr =
+                dbzMySqlConfig
+                        .getConfig()
+                        .getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
+        MySqlConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode =
+                MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
+                        bigIntUnsignedHandlingModeStr);
+        JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
+                bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
+
+        boolean timeAdjusterEnabled =
+                dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
+        return new MySqlValueConverters(
+                decimalMode,
+                timePrecisionMode,
+                bigIntUnsignedMode,
+                dbzMySqlConfig.binaryHandlingMode(),
+                timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> x,
+                MySqlValueConverters::defaultParsingErrorHandler);
+    }
+
+    public static Map<String, String> readMySqlSystemVariables(JdbcConnection connection) {
+        // Read the system variables from the MySQL instance and get the current database name ...
+        return querySystemVariables(connection, "SHOW VARIABLES");
+    }
+
+    private static Map<String, String> querySystemVariables(
+            JdbcConnection connection, String statement) {
+        final Map<String, String> variables = new HashMap<>();
+        try {
+            connection.query(
+                    statement,
+                    rs -> {
+                        while (rs.next()) {
+                            String varName = rs.getString(1);
+                            String value = rs.getString(2);
+                            if (varName != null && value != null) {
+                                variables.put(varName, value);
+                            }
+                        }
+                    });
+        } catch (SQLException e) {
+            throw new FlinkRuntimeException("Error reading MySQL variables: " + e.getMessage(), e);
+        }
+
+        return variables;
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlSource.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlSource.java
new file mode 100644
index 000000000..adbdb0132
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlSource.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql;
+
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.table.StartupOptions;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
+import io.debezium.connector.mysql.MySqlConnector;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction.LEGACY_IMPLEMENTATION_KEY;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction.LEGACY_IMPLEMENTATION_VALUE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@Deprecated
+public class MySqlSource {
+
+    private static final String DATABASE_SERVER_NAME = "mysql_binlog_source";
+
+    public static <T> Builder<T> builder() {
+        return new Builder<T>();
+    }
+
+    @Deprecated
+    public static class Builder<T> {
+
+        private int port = 3306; // default 3306 port
+        private String hostname;
+        private String[] databaseList;
+        private String username;
+        private String password;
+        private Integer serverId;
+        private String serverTimeZone;
+        private String[] tableList;
+        private Properties dbzProperties;
+        private StartupOptions startupOptions = StartupOptions.initial();
+        private DebeziumDeserializationSchema<RowData> deserializer;
+
+        public Builder<T> hostname(String hostname) {
+            this.hostname = hostname;
+            return this;
+        }
+
+        /** Integer port number of the MySQL database server. */
+        public Builder<T> port(int port) {
+            this.port = port;
+            return this;
+        }
+
+        /**
+         * An optional list of regular expressions that match database names to be monitored; any
+         * database name not included in the whitelist will be excluded from monitoring. By default
+         * all databases will be monitored.
+         */
+        public Builder<T> databaseList(String... databaseList) {
+            this.databaseList = databaseList;
+            return this;
+        }
+
+        /**
+         * An optional list of regular expressions that match fully-qualified table identifiers for
+         * tables to be monitored; any table not included in the list will be excluded from
+         * monitoring. Each identifier is of the form databaseName.tableName. By default the
+         * connector will monitor every non-system table in each monitored database.
+         */
+        public Builder<T> tableList(String... tableList) {
+            this.tableList = tableList;
+            return this;
+        }
+
+        /** Name of the MySQL database to use when connecting to the MySQL database server. */
+        public Builder<T> username(String username) {
+            this.username = username;
+            return this;
+        }
+
+        /** Password to use when connecting to the MySQL database server. */
+        public Builder<T> password(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /**
+         * The session time zone in database server, e.g. "America/Los_Angeles". It controls how the
+         * TIMESTAMP type in MYSQL converted to STRING. See more
+         * https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types
+         */
+        public Builder<T> serverTimeZone(String timeZone) {
+            this.serverTimeZone = timeZone;
+            return this;
+        }
+
+        /**
+         * A numeric ID of this database client, which must be unique across all currently-running
+         * database processes in the MySQL cluster. This connector joins the MySQL database cluster
+         * as another server (with this unique ID) so it can read the binlog. By default, a random
+         * number is generated between 5400 and 6400, though we recommend setting an explicit value.
+         */
+        public Builder<T> serverId(int serverId) {
+            this.serverId = serverId;
+            return this;
+        }
+
+        /** The Debezium MySQL connector properties. For example, "snapshot.mode". */
+        public Builder<T> debeziumProperties(Properties properties) {
+            this.dbzProperties = properties;
+            return this;
+        }
+
+        /**
+         * The deserializer used to convert from consumed {@link
+         * org.apache.kafka.connect.source.SourceRecord}.
+         */
+        public Builder<T> deserializer(DebeziumDeserializationSchema<RowData> deserializer) {
+            this.deserializer = deserializer;
+            return this;
+        }
+
+        /** Specifies the startup options. */
+        public Builder<T> startupOptions(StartupOptions startupOptions) {
+            this.startupOptions = startupOptions;
+            return this;
+        }
+
+        public DebeziumSourceFunction<RowData> build() {
+            Properties props = new Properties();
+            props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
+            // hard code server name, because we don't need to distinguish it, docs:
+            // Logical name that identifies and provides a namespace for the particular MySQL
+            // database
+            // server/cluster being monitored. The logical name should be unique across all other
+            // connectors,
+            // since it is used as a prefix for all Kafka topic names emanating from this connector.
+            // Only alphanumeric characters and underscores should be used.
+            props.setProperty("database.server.name", DATABASE_SERVER_NAME);
+            props.setProperty("database.hostname", checkNotNull(hostname));
+            props.setProperty("database.user", checkNotNull(username));
+            props.setProperty("database.password", checkNotNull(password));
+            props.setProperty("database.port", String.valueOf(port));
+            props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
+            // debezium use "long" mode to handle unsigned bigint by default,
+            // but it'll cause lose of precise when the value is larger than 2^63,
+            // so use "precise" mode to avoid it.
+            props.put("bigint.unsigned.handling.mode", "precise");
+
+            if (serverId != null) {
+                props.setProperty("database.server.id", String.valueOf(serverId));
+            }
+            if (databaseList != null) {
+                props.setProperty("database.whitelist", String.join(",", databaseList));
+            }
+            if (tableList != null) {
+                props.setProperty("table.whitelist", String.join(",", tableList));
+            }
+            if (serverTimeZone != null) {
+                props.setProperty("database.serverTimezone", serverTimeZone);
+            }
+
+            DebeziumOffset specificOffset = null;
+            switch (startupOptions.startupMode) {
+                case INITIAL:
+                    props.setProperty("snapshot.mode", "initial");
+                    break;
+
+                case EARLIEST_OFFSET:
+                    props.setProperty("snapshot.mode", "never");
+                    break;
+
+                case LATEST_OFFSET:
+                    props.setProperty("snapshot.mode", "schema_only");
+                    break;
+
+                case SPECIFIC_OFFSETS:
+                    // if binlog offset is specified, 'snapshot.mode=schema_only_recovery' must
+                    // be configured. It only snapshots the schemas, not the data,
+                    // and continue binlog reading from the specified offset
+                    props.setProperty("snapshot.mode", "schema_only_recovery");
+
+                    specificOffset = new DebeziumOffset();
+                    Map<String, String> sourcePartition = new HashMap<>();
+                    sourcePartition.put("server", DATABASE_SERVER_NAME);
+                    specificOffset.setSourcePartition(sourcePartition);
+
+                    Map<String, Object> sourceOffset = new HashMap<>();
+                    sourceOffset.put("file", startupOptions.specificOffsetFile);
+                    sourceOffset.put("pos", startupOptions.specificOffsetPos);
+                    specificOffset.setSourceOffset(sourceOffset);
+                    break;
+
+                case TIMESTAMP:
+                    checkNotNull(deserializer);
+                    props.setProperty("snapshot.mode", "never");
+                    deserializer =
+                            new SeekBinlogToTimestampFilter<>(
+                                    startupOptions.startupTimestampMillis, deserializer);
+                    break;
+
+                default:
+                    throw new UnsupportedOperationException();
+            }
+
+            if (dbzProperties != null) {
+                props.putAll(dbzProperties);
+                // Add default configurations for compatibility when set the legacy mysql connector
+                // implementation
+                if (LEGACY_IMPLEMENTATION_VALUE.equals(
+                        dbzProperties.get(LEGACY_IMPLEMENTATION_KEY))) {
+                    props.put("transforms", "snapshotasinsert");
+                    props.put(
+                            "transforms.snapshotasinsert.type",
+                            "io.debezium.connector.mysql.transforms.ReadToInsertEvent");
+                }
+            }
+
+            return new DebeziumSourceFunction<>(
+                    deserializer, props, specificOffset, new MySqlValidator(props));
+        }
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlValidator.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlValidator.java
new file mode 100644
index 000000000..eb7cea3ba
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlValidator.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql;
+
+import io.debezium.config.Configuration;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.Validator;
+import io.debezium.jdbc.JdbcConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * The validator for MySql: it only cares about the version of the database is larger than or equal
+ * to 5.7. It also requires the binlog format in the database is ROW and row image is FULL.
+ */
+public class MySqlValidator implements Validator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySqlValidator.class);
+    private static final long serialVersionUID = 1L;
+
+    private static final String BINLOG_FORMAT_ROW = "ROW";
+    private static final String BINLOG_FORMAT_IMAGE_FULL = "FULL";
+
+    private final Properties dbzProperties;
+    private final MySqlSourceConfig sourceConfig;
+
+    public MySqlValidator(Properties dbzProperties) {
+        this.dbzProperties = dbzProperties;
+        this.sourceConfig = null;
+    }
+
+    public MySqlValidator(MySqlSourceConfig sourceConfig) {
+        this.dbzProperties = sourceConfig.getDbzProperties();
+        this.sourceConfig = sourceConfig;
+    }
+
+    @Override
+    public void validate() {
+        JdbcConnection connection = null;
+        try {
+            if (sourceConfig != null) {
+                connection = DebeziumUtils.openJdbcConnection(sourceConfig);
+            } else {
+                // for the legacy source
+                connection = DebeziumUtils.createMySqlConnection(Configuration.from(dbzProperties));
+            }
+            checkVersion(connection);
+            checkBinlogFormat(connection);
+            checkBinlogRowImage(connection);
+        } catch (SQLException ex) {
+            throw new TableException(
+                    "Unexpected error while connecting to MySQL and validating", ex);
+        } finally {
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    throw new FlinkRuntimeException("Closing connection error", e);
+                }
+            }
+        }
+        LOG.info("MySQL validation passed.");
+    }
+
+    private void checkVersion(JdbcConnection connection) throws SQLException {
+        String version =
+                connection.queryAndMap("SELECT VERSION()", rs -> rs.next() ? rs.getString(1) : "");
+
+        // Only care about the major version and minor version
+        Integer[] versionNumbers =
+                Arrays.stream(version.split("\\."))
+                        .limit(2)
+                        .map(Integer::new)
+                        .toArray(Integer[]::new);
+        boolean isSatisfied;
+        if (versionNumbers[0] > 5) {
+            isSatisfied = true;
+        } else if (versionNumbers[0] < 5) {
+            isSatisfied = false;
+        } else {
+            isSatisfied = versionNumbers[1] >= 6;
+        }
+        if (!isSatisfied) {
+            throw new ValidationException(
+                    String.format(
+                            "Currently Flink MySql CDC connector only supports MySql "
+                                    + "whose version is larger or equal to 5.6, but actual is %s.%s.",
+                            versionNumbers[0], versionNumbers[1]));
+        }
+    }
+
+    /** Check whether the binlog format is ROW. */
+    private void checkBinlogFormat(JdbcConnection connection) throws SQLException {
+        String mode =
+                connection
+                        .queryAndMap(
+                                "SHOW GLOBAL VARIABLES LIKE 'binlog_format'",
+                                rs -> rs.next() ? rs.getString(2) : "")
+                        .toUpperCase();
+        if (!BINLOG_FORMAT_ROW.equals(mode)) {
+            throw new ValidationException(
+                    String.format(
+                            "The MySQL server is configured with binlog_format %s rather than %s, which is "
+                                    + "required for this connector to work properly. "
+                                + "Change the MySQL configuration to use a "
+                                    + "binlog_format=ROW and restart the connector.",
+                            mode, BINLOG_FORMAT_ROW));
+        }
+    }
+
+    /** Check whether the binlog row image is FULL. */
+    private void checkBinlogRowImage(JdbcConnection connection) throws SQLException {
+        String rowImage =
+                connection
+                        .queryAndMap(
+                                "SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'",
+                                rs -> {
+                                    if (rs.next()) {
+                                        return rs.getString(2);
+                                    }
+                                    // This setting was introduced in MySQL 5.6+ with default of
+                                    // 'FULL'.
+                                    // For older versions, assume 'FULL'.
+                                    return BINLOG_FORMAT_IMAGE_FULL;
+                                })
+                        .toUpperCase();
+        if (!rowImage.equals(BINLOG_FORMAT_IMAGE_FULL)) {
+            throw new ValidationException(
+                    String.format(
+                            "The MySQL server is configured with binlog_row_image %s rather than %s, which is "
+                                    + "required for this connector to work properly. "
+                                + "Change the MySQL configuration to use a "
+                                    + "binlog_row_image=FULL and restart the connector.",
+                            rowImage, BINLOG_FORMAT_IMAGE_FULL));
+        }
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/SeekBinlogToTimestampFilter.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/SeekBinlogToTimestampFilter.java
new file mode 100644
index 000000000..a2613510b
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/SeekBinlogToTimestampFilter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
+import io.debezium.data.Envelope;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link DebeziumDeserializationSchema} which wraps a real {@link DebeziumDeserializationSchema}
+ * to seek binlog to the specific timestamp.
+ */
+public class SeekBinlogToTimestampFilter<T> implements DebeziumDeserializationSchema<T> {
+    private static final long serialVersionUID = -4450118969976653497L;
+    protected static final Logger LOG = LoggerFactory.getLogger(SeekBinlogToTimestampFilter.class);
+
+    private final long startupTimestampMillis;
+    private final DebeziumDeserializationSchema<T> serializer;
+
+    private transient boolean find = false;
+    private transient long filtered = 0L;
+
+    public SeekBinlogToTimestampFilter(
+            long startupTimestampMillis, DebeziumDeserializationSchema<T> serializer) {
+        this.startupTimestampMillis = startupTimestampMillis;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
+        if (find) {
+            serializer.deserialize(record, out);
+            return;
+        }
+
+        if (filtered == 0) {
+            LOG.info("Begin to seek binlog to the specific timestamp {}.", startupTimestampMillis);
+        }
+
+        Struct value = (Struct) record.value();
+        Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+        Long ts = source.getInt64(Envelope.FieldName.TIMESTAMP);
+        if (ts != null && ts >= startupTimestampMillis) {
+            serializer.deserialize(record, out);
+            find = true;
+            LOG.info(
+                    "Successfully seek to the specific timestamp {} with filtered {} change events.",
+                    startupTimestampMillis,
+                    filtered);
+        } else {
+            filtered++;
+            if (filtered % 10000 == 0) {
+                LOG.info(
+                        "Seeking binlog to specific timestamp with filtered {} change events.",
+                        filtered);
+            }
+        }
+    }
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<T> out, TableChange tableChange)
+        throws Exception {
+
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return serializer.getProducedType();
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceConfig.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceConfig.java
new file mode 100644
index 000000000..835778fed
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceConfig.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.config;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.relational.RelationalTableFilters;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.table.StartupOptions;
+
+public class MySqlSourceConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String hostname;
+    private final int port;
+    private final String username;
+    private final String password;
+    private final List<String> databaseList;
+    private final List<String> tableList;
+    @Nullable private final ServerIdRange serverIdRange;
+    private final StartupOptions startupOptions;
+    private final int splitSize;
+    private final int splitMetaGroupSize;
+    private final int fetchSize;
+    private final String serverTimeZone;
+    private final Duration connectTimeout;
+    private final int connectMaxRetries;
+    private final int connectionPoolSize;
+    private final double distributionFactorUpper;
+    private final double distributionFactorLower;
+    private final boolean includeSchemaChanges;
+    private final boolean scanNewlyAddedTableEnabled;
+    private final Properties jdbcProperties;
+
+    // --------------------------------------------------------------------------------------------
+    // Debezium Configurations
+    // --------------------------------------------------------------------------------------------
+    private final Properties dbzProperties;
+    private final Configuration dbzConfiguration;
+    private final MySqlConnectorConfig dbzMySqlConfig;
+
+    MySqlSourceConfig(
+            String hostname,
+            int port,
+            String username,
+            String password,
+            List<String> databaseList,
+            List<String> tableList,
+            @Nullable ServerIdRange serverIdRange,
+            StartupOptions startupOptions,
+            int splitSize,
+            int splitMetaGroupSize,
+            int fetchSize,
+            String serverTimeZone,
+            Duration connectTimeout,
+            int connectMaxRetries,
+            int connectionPoolSize,
+            double distributionFactorUpper,
+            double distributionFactorLower,
+            boolean includeSchemaChanges,
+            boolean scanNewlyAddedTableEnabled,
+            Properties dbzProperties,
+            Properties jdbcProperties) {
+        this.hostname = checkNotNull(hostname);
+        this.port = port;
+        this.username = checkNotNull(username);
+        this.password = password;
+        this.databaseList = checkNotNull(databaseList);
+        this.tableList = checkNotNull(tableList);
+        this.serverIdRange = serverIdRange;
+        this.startupOptions = checkNotNull(startupOptions);
+        this.splitSize = splitSize;
+        this.splitMetaGroupSize = splitMetaGroupSize;
+        this.fetchSize = fetchSize;
+        this.serverTimeZone = checkNotNull(serverTimeZone);
+        this.connectTimeout = checkNotNull(connectTimeout);
+        this.connectMaxRetries = connectMaxRetries;
+        this.connectionPoolSize = connectionPoolSize;
+        this.distributionFactorUpper = distributionFactorUpper;
+        this.distributionFactorLower = distributionFactorLower;
+        this.includeSchemaChanges = includeSchemaChanges;
+        this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
+        this.dbzProperties = checkNotNull(dbzProperties);
+        this.dbzConfiguration = Configuration.from(dbzProperties);
+        this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
+        this.jdbcProperties = jdbcProperties;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public List<String> getDatabaseList() {
+        return databaseList;
+    }
+
+    public List<String> getTableList() {
+        return tableList;
+    }
+
+    @Nullable
+    public ServerIdRange getServerIdRange() {
+        return serverIdRange;
+    }
+
+    public StartupOptions getStartupOptions() {
+        return startupOptions;
+    }
+
+    public int getSplitSize() {
+        return splitSize;
+    }
+
+    public int getSplitMetaGroupSize() {
+        return splitMetaGroupSize;
+    }
+
+    public double getDistributionFactorUpper() {
+        return distributionFactorUpper;
+    }
+
+    public double getDistributionFactorLower() {
+        return distributionFactorLower;
+    }
+
+    public int getFetchSize() {
+        return fetchSize;
+    }
+
+    public String getServerTimeZone() {
+        return serverTimeZone;
+    }
+
+    public Duration getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public int getConnectMaxRetries() {
+        return connectMaxRetries;
+    }
+
+    public int getConnectionPoolSize() {
+        return connectionPoolSize;
+    }
+
+    public boolean isIncludeSchemaChanges() {
+        return includeSchemaChanges;
+    }
+
+    public boolean isScanNewlyAddedTableEnabled() {
+        return scanNewlyAddedTableEnabled;
+    }
+
+    public Properties getDbzProperties() {
+        return dbzProperties;
+    }
+
+    public Configuration getDbzConfiguration() {
+        return dbzConfiguration;
+    }
+
+    public MySqlConnectorConfig getMySqlConnectorConfig() {
+        return dbzMySqlConfig;
+    }
+
+    public RelationalTableFilters getTableFilters() {
+        return dbzMySqlConfig.getTableFilters();
+    }
+
+    public Properties getJdbcProperties() {
+        return jdbcProperties;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceOptions.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceOptions.java
new file mode 100644
index 000000000..9885cf74f
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceOptions.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.config;
+
+import java.time.Duration;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class MySqlSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the MySQL database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the MySQL database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the MySQL database to use when connecting to the MySQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the MySQL database server.");
+
+    public static final ConfigOption<String> DATABASE_NAME =
+            ConfigOptions.key("database-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Database name of the MySQL server to monitor.");
+
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Table name of the MySQL database to monitor.");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .defaultValue("UTC")
+                    .withDescription("The session time zone in database server.");
+
+    public static final ConfigOption<String> SERVER_ID =
+            ConfigOptions.key("server-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "A numeric ID or a numeric ID range of this database client, "
+                                    + "The numeric ID syntax is like '5400', the numeric ID range syntax "
+                                    + "is like '5400-5408', The numeric ID range syntax is recommended when "
+                                    + "'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all "
+                                    + "currently-running database processes in the MySQL cluster. This connector"
+                                    + " joins the MySQL  cluster as another server (with this unique ID) "
+                                    + "so it can read the binlog. By default, a random number is generated between"
+                                    + " 5400 and 6400, though we recommend setting an explicit value.");
+
+    public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ENABLED =
+            ConfigOptions.key("scan.incremental.snapshot.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Incremental snapshot is a new mechanism to read snapshot of a table. "
+                                    + "Compared to the old snapshot mechanism, the incremental "
+                                    + "snapshot has many advantages, including:\n"
+                                    + "(1) source can be parallel during snapshot reading, \n"
+                                    + "(2) source can perform checkpoints in the chunk "
+                                    + "granularity during snapshot reading, \n"
+                                    + "(3) source doesn't need to acquire global read lock "
+                                    + "(FLUSH TABLES WITH READ LOCK) before snapshot reading.\n"
+                                    + "If you would like the source run in parallel, each parallel "
+                                    + "reader should have an unique server id, "
+                                    + "so the 'server-id' must be a range like '5400-6400', "
+                                    + "and the range must be larger than the parallelism.");
+
+    public static final ConfigOption<Integer> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table snapshot, "
+                                    + "captured tables are split into multiple "
+                                    + "chunks when read the snapshot of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait after "
+                                    + "trying to connect to the MySQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should retry to build "
+                                    + "MySQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for MySQL CDC consumer, valid "
+                                    + "enumerations are "
+                                    + "\"initial\", \"earliest-offset\", "
+                                    + "\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE =
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional offsets used in case of \"specific-offset\" startup mode");
+
+    public static final ConfigOption<Integer> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional offsets used in case of \"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.startup.timestamp-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of \"timestamp\" startup mode");
+
+    public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
+            ConfigOptions.key("heartbeat.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "Optional interval of sending heartbeat event for tracing the "
+                                    + "latest available binlog offsets");
+
+    public static final ConfigOption<Boolean> APPEND_MODE =
+            ConfigOptions.key("append-mode")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether works as append source.");
+
+    // ----------------------------------------------------------------------------
+    // experimental options, won't add them to documentation
+    // ----------------------------------------------------------------------------
+    @Experimental
+    public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
+            ConfigOptions.key("chunk-meta.group.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The group size of chunk meta, if the meta size exceeds the "
+                                    + "group size, the meta will be will be divided into multiple groups.");
+
+    @Experimental
+    public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+            ConfigOptions.key("split-key.even-distribution.factor.upper-bound")
+                    .doubleType()
+                    .defaultValue(1000.0d)
+                    .withDescription(
+                            "The upper bound of split key distribution factor. The distribution "
+                                    + "factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly calculation optimization "
+                                    + "when the data distribution is even,"
+                                    + " and the query MySQL for splitting would happen when it is uneven."
+                                    + " The distribution factor could be calculated by (MAX(id) - "
+                                    + "MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+            ConfigOptions.key("split-key.even-distribution.factor.lower-bound")
+                    .doubleType()
+                    .defaultValue(0.05d)
+                    .withDescription(
+                            "The lower bound of split key distribution factor. The distribution "
+                                    + "factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly calculation optimization "
+                                    + "when the data distribution is even,"
+                                    + " and the query MySQL for splitting would happen when it is uneven."
+                                    + " The distribution factor could be calculated by (MAX(id) - "
+                                    + "MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
+            ConfigOptions.key("scan.newly-added-table.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether capture the scan the newly added tables or not, by default is false.");
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/ServerIdRange.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/ServerIdRange.java
new file mode 100644
index 000000000..aff76f2aa
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/ServerIdRange.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.config;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import javax.annotation.Nullable;
+
+/**
+ * This class defines a range of server id. The boundaries of the range are inclusive.
+ *
+ * @see MySqlSourceOptions#SERVER_ID
+ */
+public class ServerIdRange implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /** Start of the range (inclusive). */
+    private final int startServerId;
+
+    /** End of the range (inclusive). */
+    private final int endServerId;
+
+    public ServerIdRange(int startServerId, int endServerId) {
+        this.startServerId = startServerId;
+        this.endServerId = endServerId;
+    }
+
+    public int getStartServerId() {
+        return startServerId;
+    }
+
+    public int getEndServerId() {
+        return endServerId;
+    }
+
+    public int getServerId(int subTaskId) {
+        checkArgument(subTaskId >= 0, "Subtask ID %s shouldn't be a negative number.", subTaskId);
+        if (subTaskId > getNumberOfServerIds()) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Subtask ID %s is out of server id range %s, "
+                                    + "please adjust the server id range to "
+                                    + "make the number of server id larger than "
+                                    + "the source parallelism.",
+                            subTaskId, this.toString()));
+        }
+        return startServerId + subTaskId;
+    }
+
+    public int getNumberOfServerIds() {
+        return endServerId - startServerId + 1;
+    }
+
+    @Override
+    public String toString() {
+        if (startServerId == endServerId) {
+            return String.valueOf(startServerId);
+        } else {
+            return startServerId + "-" + endServerId;
+        }
+    }
+
+    /**
+     * Returns a {@link ServerIdRange} from a server id range string which likes '5400-5408' or a
+     * single server id likes '5400'.
+     */
+    public static @Nullable ServerIdRange from(@Nullable String range) {
+        if (range == null) {
+            return null;
+        }
+        if (range.contains("-")) {
+            String[] idArray = range.split("-");
+            if (idArray.length != 2) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "The server id range should be syntax like '5400-5500', but got: %s",
+                                range));
+            }
+            return new ServerIdRange(
+                    parseServerId(idArray[0].trim()), parseServerId(idArray[1].trim()));
+        } else {
+            int serverId = parseServerId(range);
+            return new ServerIdRange(serverId, serverId);
+        }
+    }
+
+    private static int parseServerId(String serverIdValue) {
+        try {
+            return Integer.parseInt(serverIdValue);
+        } catch (NumberFormatException e) {
+            throw new IllegalStateException(
+                    String.format("The server id %s is not a valid numeric.", serverIdValue), e);
+        }
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPoolId.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPoolId.java
new file mode 100644
index 000000000..8fcef7de9
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPoolId.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The connection pool identifier. */
+public class ConnectionPoolId implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final String host;
+    private final int port;
+
+    public ConnectionPoolId(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ConnectionPoolId)) {
+            return false;
+        }
+        ConnectionPoolId that = (ConnectionPoolId) o;
+        return Objects.equals(host, that.host) && Objects.equals(port, that.port);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(host, port);
+    }
+
+    @Override
+    public String toString() {
+        return host + ':' + port;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPools.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPools.java
new file mode 100644
index 000000000..ab3a6b36d
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPools.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.flink.annotation.Internal;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+
+/** A JDBC connection pools that consists of {@link HikariDataSource}. */
+@Internal
+public interface ConnectionPools {
+
+    /**
+     * Gets a connection pool from pools, create a new pool if the pool does not exists in the
+     * connection pools .
+     */
+    HikariDataSource getOrCreateConnectionPool(
+            ConnectionPoolId poolId, MySqlSourceConfig sourceConfig);
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionFactory.java
new file mode 100644
index 000000000..6454cb50e
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection;
+
+import com.zaxxer.hikari.HikariDataSource;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import java.sql.Connection;
+import java.sql.SQLException;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A factory to create JDBC connection for MySQL. */
+public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionFactory.class);
+
+    private final MySqlSourceConfig sourceConfig;
+
+    public JdbcConnectionFactory(MySqlSourceConfig sourceConfig) {
+        this.sourceConfig = sourceConfig;
+    }
+
+    @Override
+    public Connection connect(JdbcConfiguration config) throws SQLException {
+        final int connectRetryTimes = sourceConfig.getConnectMaxRetries();
+
+        final ConnectionPoolId connectionPoolId =
+                new ConnectionPoolId(sourceConfig.getHostname(), sourceConfig.getPort());
+
+        HikariDataSource dataSource =
+                JdbcConnectionPools.getInstance()
+                        .getOrCreateConnectionPool(connectionPoolId, sourceConfig);
+
+        int i = 0;
+        while (i < connectRetryTimes) {
+            try {
+                return dataSource.getConnection();
+            } catch (SQLException e) {
+                if (i < connectRetryTimes - 1) {
+                    try {
+                        Thread.sleep(300);
+                    } catch (InterruptedException ie) {
+                        throw new FlinkRuntimeException(
+                                "Failed to get connection, interrupted while doing another attempt",
+                                ie);
+                    }
+                    LOG.warn("Get connection failed, retry times {}", i + 1);
+                } else {
+                    LOG.error("Get connection failed after retry {} times", i + 1);
+                    throw new FlinkRuntimeException(e);
+                }
+            }
+            i++;
+        }
+        return dataSource.getConnection();
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionPools.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionPools.java
new file mode 100644
index 000000000..72801b509
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionPools.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection;
+
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection.PooledDataSourceFactory.createPooledDataSource;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A Jdbc Connection pools implementation. */
+public class JdbcConnectionPools implements ConnectionPools {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionPools.class);
+
+    private static final JdbcConnectionPools INSTANCE = new JdbcConnectionPools();
+    private final Map<ConnectionPoolId, HikariDataSource> pools = new HashMap<>();
+
+    private JdbcConnectionPools() {
+    }
+
+    public static JdbcConnectionPools getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public HikariDataSource getOrCreateConnectionPool(
+            ConnectionPoolId poolId, MySqlSourceConfig sourceConfig) {
+        synchronized (pools) {
+            if (!pools.containsKey(poolId)) {
+                LOG.info("Create and register connection pool {}", poolId);
+                pools.put(poolId, createPooledDataSource(sourceConfig));
+            }
+            return pools.get(poolId);
+        }
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/PooledDataSourceFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/PooledDataSourceFactory.java
new file mode 100644
index 000000000..eafe67dbf
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/PooledDataSourceFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import java.util.Properties;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+
+/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
+public class PooledDataSourceFactory {
+
+    public static final String JDBC_URL_PATTERN =
+            "jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true";
+    public static final String CONNECTION_POOL_PREFIX = "connection-pool-";
+    public static final String SERVER_TIMEZONE_KEY = "serverTimezone";
+    public static final int MINIMUM_POOL_SIZE = 1;
+    private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties();
+
+    private PooledDataSourceFactory() {
+    }
+
+    public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceConfig) {
+        final HikariConfig config = new HikariConfig();
+
+        String hostName = sourceConfig.getHostname();
+        int port = sourceConfig.getPort();
+        Properties jdbcProperties = sourceConfig.getJdbcProperties();
+
+        config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
+        config.setJdbcUrl(formatJdbcUrl(hostName, port, jdbcProperties));
+        config.setUsername(sourceConfig.getUsername());
+        config.setPassword(sourceConfig.getPassword());
+        config.setMinimumIdle(MINIMUM_POOL_SIZE);
+        config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize());
+        config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis());
+        config.addDataSourceProperty(SERVER_TIMEZONE_KEY, sourceConfig.getServerTimeZone());
+        config.setDriverClassName(
+                sourceConfig.getDbzConfiguration().getString(MySqlConnectorConfig.JDBC_DRIVER));
+
+        // optional optimization configurations for pooled DataSource
+        config.addDataSourceProperty("cachePrepStmts", "true");
+        config.addDataSourceProperty("prepStmtCacheSize", "250");
+        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
+
+        return new HikariDataSource(config);
+    }
+
+    private static String formatJdbcUrl(String hostName, int port, Properties jdbcProperties) {
+        Properties combinedProperties = new Properties();
+        combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES);
+        combinedProperties.putAll(jdbcProperties);
+
+        StringBuilder jdbcUrlStringBuilder =
+                new StringBuilder(String.format(JDBC_URL_PATTERN, hostName, port));
+
+        combinedProperties.forEach(
+                (key, value) -> {
+                    jdbcUrlStringBuilder.append("&").append(key).append("=").append(value);
+                });
+
+        return jdbcUrlStringBuilder.toString();
+    }
+
+    private static Properties initializeDefaultJdbcProperties() {
+        Properties defaultJdbcProperties = new Properties();
+        defaultJdbcProperties.setProperty("zeroDateTimeBehavior", "CONVERT_TO_NULL");
+        defaultJdbcProperties.setProperty("characterEncoding", "UTF-8");
+        defaultJdbcProperties.setProperty("characterSetResults", "UTF-8");
+        return defaultJdbcProperties;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/JdbcUrlUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/JdbcUrlUtils.java
new file mode 100644
index 000000000..fdf83b600
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/JdbcUrlUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import java.util.Map;
+import java.util.Properties;
+
+/** Option utils for JDBC URL properties. */
+public class JdbcUrlUtils {
+
+    // Prefix for JDBC specific properties.
+    public static final String PROPERTIES_PREFIX = "jdbc.properties.";
+
+    public static Properties getJdbcProperties(Map<String, String> tableOptions) {
+        Properties jdbcProperties = new Properties();
+        if (hasJdbcProperties(tableOptions)) {
+            tableOptions.keySet().stream()
+                    .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+                    .forEach(
+                            key -> {
+                                final String value = tableOptions.get(key);
+                                final String subKey = key.substring((PROPERTIES_PREFIX).length());
+                                jdbcProperties.put(subKey, value);
+                            });
+        }
+        return jdbcProperties;
+    }
+
+    /**
+     * Decides if the table options contains JDBC properties that start with prefix
+     * 'jdbc.properties'.
+     */
+    private static boolean hasJdbcProperties(Map<String, String> tableOptions) {
+        return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlDeserializationConverterFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlDeserializationConverterFactory.java
new file mode 100644
index 000000000..d4b4c7858
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlDeserializationConverterFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import com.esri.core.geometry.ogc.OGCGeometry;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.DeserializationRuntimeConverter;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.DeserializationRuntimeConverterFactory;
+import io.debezium.data.EnumSet;
+import io.debezium.data.geometry.Geometry;
+import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.nio.ByteBuffer;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Used to create {@link DeserializationRuntimeConverterFactory} specified to MySQL. */
+public class MySqlDeserializationConverterFactory {
+
+    public static DeserializationRuntimeConverterFactory instance() {
+        return new DeserializationRuntimeConverterFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Optional<DeserializationRuntimeConverter> createUserDefinedConverter(
+                    LogicalType logicalType, ZoneId serverTimeZone) {
+                switch (logicalType.getTypeRoot()) {
+                    case CHAR:
+                    case VARCHAR:
+                        return createStringConverter();
+                    case ARRAY:
+                        return createArrayConverter((ArrayType) logicalType);
+                    default:
+                        // fallback to default converter
+                        return Optional.empty();
+                }
+            }
+        };
+    }
+
+    private static Optional<DeserializationRuntimeConverter> createStringConverter() {
+        final ObjectMapper objectMapper = new ObjectMapper();
+        final ObjectWriter objectWriter = objectMapper.writer();
+        return Optional.of(
+                new DeserializationRuntimeConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) throws Exception {
+                        // the Geometry datatype in MySQL will be converted to
+                        // a String with Json format
+                        if (Point.LOGICAL_NAME.equals(schema.name())
+                                || Geometry.LOGICAL_NAME.equals(schema.name())) {
+                            try {
+                                Struct geometryStruct = (Struct) dbzObj;
+                                byte[] wkb = geometryStruct.getBytes("wkb");
+                                String geoJson =
+                                        OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson();
+                                JsonNode originGeoNode = objectMapper.readTree(geoJson);
+                                Optional<Integer> srid =
+                                        Optional.ofNullable(geometryStruct.getInt32("srid"));
+                                Map<String, Object> geometryInfo = new HashMap<>();
+                                String geometryType = originGeoNode.get("type").asText();
+                                geometryInfo.put("type", geometryType);
+                                if (geometryType.equals("GeometryCollection")) {
+                                    geometryInfo.put("geometries", originGeoNode.get("geometries"));
+                                } else {
+                                    geometryInfo.put(
+                                            "coordinates", originGeoNode.get("coordinates"));
+                                }
+                                geometryInfo.put("srid", srid.orElse(0));
+                                return StringData.fromString(
+                                        objectWriter.writeValueAsString(geometryInfo));
+                            } catch (Exception e) {
+                                throw new IllegalArgumentException(
+                                        String.format(
+                                                "Failed to convert %s to geometry JSON.", dbzObj),
+                                        e);
+                            }
+                        } else {
+                            return StringData.fromString(dbzObj.toString());
+                        }
+                    }
+                });
+    }
+
+    private static Optional<DeserializationRuntimeConverter> createArrayConverter(
+            ArrayType arrayType) {
+        if (LogicalTypeChecks.hasFamily(
+                arrayType.getElementType(), LogicalTypeFamily.CHARACTER_STRING)) {
+            // only map MySQL SET type to Flink ARRAY<STRING> type
+            return Optional.of(
+                    new DeserializationRuntimeConverter() {
+                        private static final long serialVersionUID = 1L;
+
+                        @Override
+                        public Object convert(Object dbzObj, Schema schema) throws Exception {
+                            if (EnumSet.LOGICAL_NAME.equals(schema.name())
+                                    && dbzObj instanceof String) {
+                                // for SET datatype in mysql, debezium will always
+                                // return a string split by comma like "a,b,c"
+                                String[] enums = ((String) dbzObj).split(",");
+                                StringData[] elements = new StringData[enums.length];
+                                for (int i = 0; i < enums.length; i++) {
+                                    elements[i] = StringData.fromString(enums[i]);
+                                }
+                                return new GenericArrayData(elements);
+                            } else {
+                                throw new IllegalArgumentException(
+                                        String.format(
+                                                "Unable convert to Flink ARRAY type from unexpected value '%s', "
+                                                        + "only SET type could be converted to ARRAY type for MySQL",
+                                                dbzObj));
+                            }
+                        }
+                    });
+        } else {
+            // otherwise, fallback to default converter
+            return Optional.empty();
+        }
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java
new file mode 100644
index 000000000..fc456f929
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.MetadataConverter;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** Defines the supported metadata columns for {@link MySqlTableSource}. */
+public enum MySqlReadableMetadata {
+    /** Name of the table that contain the row. */
+    TABLE_NAME(
+            "table_name",
+            DataTypes.STRING().notNull(),
+            new MetadataConverter() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public Object read(SourceRecord record) {
+                    Struct messageStruct = (Struct) record.value();
+                    Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+                    return StringData.fromString(
+                            sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
+                }
+            }),
+
+    /** Name of the database that contain the row. */
+    DATABASE_NAME(
+            "database_name",
+            DataTypes.STRING().notNull(),
+            new MetadataConverter() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public Object read(SourceRecord record) {
+                    Struct messageStruct = (Struct) record.value();
+                    Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+                    return StringData.fromString(
+                            sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
+                }
+            }),
+
+    /**
+     * It indicates the time that the change was made in the database. If the record is read from
+     * snapshot of the table instead of the binlog, the value is always 0.
+     */
+    OP_TS(
+            "op_ts",
+            DataTypes.TIMESTAMP_LTZ(3).notNull(),
+            new MetadataConverter() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public Object read(SourceRecord record) {
+                    Struct messageStruct = (Struct) record.value();
+                    Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+                    return TimestampData.fromEpochMillis(
+                            (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+                }
+            }),
+
+    /** Name of the table that contain the row. . */
+    META_TABLE_NAME(
+        "meta.table_name",
+        DataTypes.STRING().notNull(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                Struct messageStruct = (Struct) record.value();
+                Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+                return StringData.fromString(
+                    sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
+            }
+        }),
+
+    /** Name of the database that contain the row. */
+    META_DATABASE_NAME(
+        "meta.database_name",
+        DataTypes.STRING().notNull(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                Struct messageStruct = (Struct) record.value();
+                Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+                return StringData.fromString(
+                    sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
+            }
+        }),
+
+    /**
+     * It indicates the time that the change was made in the database. If the record is read from
+     * snapshot of the table instead of the binlog, the value is always 0.
+     */
+    META_OP_TS(
+        "meta.op_ts",
+        DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                Struct messageStruct = (Struct) record.value();
+                Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+                return TimestampData.fromEpochMillis(
+                    (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+            }
+        }),
+
+    /** Operation type, INSERT/UPDATE/DELETE. */
+    OP_TYPE(
+        "meta.op_type",
+        DataTypes.STRING().notNull(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                final Envelope.Operation op = Envelope.operationFor(record);
+                if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+                    return StringData.fromString("INSERT");
+                } else if (op == Envelope.Operation.DELETE) {
+                    return StringData.fromString("DELETE");
+                } else {
+                    return StringData.fromString("UPDATE");
+                }
+            }
+        }),
+
+    /** Not important, a simple increment counter. */
+    BATCH_ID(
+        "meta.batch_id",
+        DataTypes.BIGINT().nullable(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            private long id = 0;
+
+            @Override
+            public Object read(SourceRecord record) {
+                return id++;
+            }
+        }),
+
+    /** Source does not emit ddl data. */
+    IS_DDL(
+        "meta.is_ddl",
+        DataTypes.BOOLEAN().notNull(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                return false;
+            }
+        }),
+
+    /** The update-before data for UPDATE record. */
+    OLD(
+        "meta.update_before",
+        DataTypes.ARRAY(
+                DataTypes.MAP(
+                        DataTypes.STRING().nullable(),
+                        DataTypes.STRING().nullable())
+                    .nullable())
+            .nullable(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                final Envelope.Operation op = Envelope.operationFor(record);
+                if (op != Envelope.Operation.UPDATE) {
+                    return null;
+                }
+                return record;
+            }
+        }),
+
+    MYSQL_TYPE(
+        "meta.mysql_type",
+        DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                return null;
+            }
+
+            @Override
+            public Object read(
+                SourceRecord record, @Nullable TableChanges.TableChange tableSchema) {
+                if (tableSchema == null) {
+                    return null;
+                }
+                Map<StringData, StringData> mysqlType = new HashMap<>();
+                final Table table = tableSchema.getTable();
+                table.columns()
+                    .forEach(
+                        column -> {
+                            mysqlType.put(
+                                StringData.fromString(column.name()),
+                                StringData.fromString(
+                                    String.format(
+                                        "%s(%d)",
+                                        column.typeName(),
+                                        column.length())));
+                        });
+
+                return new GenericMapData(mysqlType);
+            }
+        }),
+
+    PK_NAMES(
+        "meta.pk_names",
+        DataTypes.ARRAY(DataTypes.STRING().nullable()).nullable(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                return null;
+            }
+
+            @Override
+            public Object read(
+                SourceRecord record, @Nullable TableChanges.TableChange tableSchema) {
+                if (tableSchema == null) {
+                    return null;
+                }
+                return new GenericArrayData(
+                    tableSchema.getTable().primaryKeyColumnNames().stream()
+                        .map(StringData::fromString)
+                        .toArray());
+            }
+        }),
+
+    SQL(
+        "meta.sql",
+        DataTypes.STRING().nullable(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                return StringData.fromString("");
+            }
+        }),
+
+    SQL_TYPE(
+        "meta.sql_type",
+        DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable()).nullable(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                return null;
+            }
+
+            @Override
+            public Object read(
+                SourceRecord record, @Nullable TableChanges.TableChange tableSchema) {
+                if (tableSchema == null) {
+                    return null;
+                }
+                Map<StringData, Integer> mysqlType = new HashMap<>();
+                final Table table = tableSchema.getTable();
+                table.columns()
+                    .forEach(
+                        column -> {
+                            mysqlType.put(
+                                StringData.fromString(column.name()),
+                                column.jdbcType());
+                        });
+
+                return new GenericMapData(mysqlType);
+            }
+        }),
+
+    TS(
+        "meta.ts",
+        DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                Struct messageStruct = (Struct) record.value();
+                return TimestampData.fromEpochMillis(
+                    (Long) messageStruct.get(Envelope.FieldName.TIMESTAMP));
+            }
+        });
+
+
+
+
+    private final String key;
+
+    private final DataType dataType;
+
+    private final MetadataConverter converter;
+
+    MySqlReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+        this.key = key;
+        this.dataType = dataType;
+        this.converter = converter;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public DataType getDataType() {
+        return dataType;
+    }
+
+    public MetadataConverter getConverter() {
+        return converter;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
new file mode 100644
index 000000000..d4611c1e5
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.MetadataConverter;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a MySQL binlog source from a logical
+ * description.
+ */
+public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+    private final ResolvedSchema physicalSchema;
+    private final int port;
+    private final String hostname;
+    private final String database;
+    private final String username;
+    private final String password;
+    private final String serverId;
+    private final String tableName;
+    private final ZoneId serverTimeZone;
+    private final Properties dbzProperties;
+    private final boolean enableParallelRead;
+    private final int splitSize;
+    private final int splitMetaGroupSize;
+    private final int fetchSize;
+    private final Duration connectTimeout;
+    private final int connectionPoolSize;
+    private final int connectMaxRetries;
+    private final double distributionFactorUpper;
+    private final double distributionFactorLower;
+    private final StartupOptions startupOptions;
+    private final boolean scanNewlyAddedTableEnabled;
+    private final Properties jdbcProperties;
+    private final Duration heartbeatInterval;
+
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    public MySqlTableSource(
+            ResolvedSchema physicalSchema,
+            int port,
+            String hostname,
+            String database,
+            String tableName,
+            String username,
+            String password,
+            ZoneId serverTimeZone,
+            Properties dbzProperties,
+            @Nullable String serverId,
+            boolean enableParallelRead,
+            int splitSize,
+            int splitMetaGroupSize,
+            int fetchSize,
+            Duration connectTimeout,
+            int connectMaxRetries,
+            int connectionPoolSize,
+            double distributionFactorUpper,
+            double distributionFactorLower,
+            StartupOptions startupOptions,
+            Duration heartbeatInterval) {
+        this(
+                physicalSchema,
+                port,
+                hostname,
+                database,
+                tableName,
+                username,
+                password,
+                serverTimeZone,
+                dbzProperties,
+                serverId,
+                enableParallelRead,
+                splitSize,
+                splitMetaGroupSize,
+                fetchSize,
+                connectTimeout,
+                connectMaxRetries,
+                connectionPoolSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                startupOptions,
+                false,
+                new Properties(),
+                heartbeatInterval);
+    }
+
+    public MySqlTableSource(
+            ResolvedSchema physicalSchema,
+            int port,
+            String hostname,
+            String database,
+            String tableName,
+            String username,
+            String password,
+            ZoneId serverTimeZone,
+            Properties dbzProperties,
+            @Nullable String serverId,
+            boolean enableParallelRead,
+            int splitSize,
+            int splitMetaGroupSize,
+            int fetchSize,
+            Duration connectTimeout,
+            int connectMaxRetries,
+            int connectionPoolSize,
+            double distributionFactorUpper,
+            double distributionFactorLower,
+            StartupOptions startupOptions,
+            boolean scanNewlyAddedTableEnabled,
+            Properties jdbcProperties,
+            Duration heartbeatInterval) {
+        this.physicalSchema = physicalSchema;
+        this.port = port;
+        this.hostname = checkNotNull(hostname);
+        this.database = checkNotNull(database);
+        this.tableName = checkNotNull(tableName);
+        this.username = checkNotNull(username);
+        this.password = checkNotNull(password);
+        this.serverId = serverId;
+        this.serverTimeZone = serverTimeZone;
+        this.dbzProperties = dbzProperties;
+        this.enableParallelRead = enableParallelRead;
+        this.splitSize = splitSize;
+        this.splitMetaGroupSize = splitMetaGroupSize;
+        this.fetchSize = fetchSize;
+        this.connectTimeout = connectTimeout;
+        this.connectMaxRetries = connectMaxRetries;
+        this.connectionPoolSize = connectionPoolSize;
+        this.distributionFactorUpper = distributionFactorUpper;
+        this.distributionFactorLower = distributionFactorLower;
+        this.startupOptions = startupOptions;
+        this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
+        this.jdbcProperties = jdbcProperties;
+        // Mutable attributes
+        this.producedDataType = physicalSchema.toPhysicalRowDataType();
+        this.metadataKeys = Collections.emptyList();
+        this.heartbeatInterval = heartbeatInterval;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        RowType physicalDataType =
+                (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+        MetadataConverter[] metadataConverters = getMetadataConverters();
+        final TypeInformation<RowData> typeInfo =
+                scanContext.createTypeInformation(producedDataType);
+
+        DebeziumDeserializationSchema<RowData> deserializer =
+                RowDataDebeziumDeserializeSchema.newBuilder()
+                        .setPhysicalRowType(physicalDataType)
+                        .setMetadataConverters(metadataConverters)
+                        .setResultTypeInfo(typeInfo)
+                        .setServerTimeZone(serverTimeZone)
+                        .setUserDefinedConverterFactory(
+                                MySqlDeserializationConverterFactory.instance())
+                        .build();
+
+            org.apache.inlong.sort.singletenant.flink.cdc.mysql.MySqlSource.Builder builder =
+                    org.apache.inlong.sort.singletenant.flink.cdc.mysql.MySqlSource.<RowData>builder()
+                            .hostname(hostname)
+                            .port(port)
+                            .databaseList(database)
+                            .tableList(database + "." + tableName)
+                            .username(username)
+                            .password(password)
+                            .serverTimeZone(serverTimeZone.toString())
+                            .debeziumProperties(dbzProperties)
+                            .startupOptions(startupOptions)
+                            .deserializer(deserializer);
+            Optional.ofNullable(serverId)
+                    .ifPresent(serverId -> builder.serverId(Integer.parseInt(serverId)));
+            DebeziumSourceFunction<RowData> sourceFunction = builder.build();
+            return SourceFunctionProvider.of(sourceFunction, false);
+
+    }
+
+    protected MetadataConverter[] getMetadataConverters() {
+        if (metadataKeys.isEmpty()) {
+            return new MetadataConverter[0];
+        }
+
+        return metadataKeys.stream()
+                .map(
+                        key ->
+                                Stream.of(MySqlReadableMetadata.values())
+                                        .filter(m -> m.getKey().equals(key))
+                                        .findFirst()
+                                        .orElseThrow(IllegalStateException::new))
+                .map(MySqlReadableMetadata::getConverter)
+                .toArray(MetadataConverter[]::new);
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Stream.of(MySqlReadableMetadata.values())
+                .collect(
+                        Collectors.toMap(
+                                MySqlReadableMetadata::getKey, MySqlReadableMetadata::getDataType));
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        this.metadataKeys = metadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        MySqlTableSource source =
+                new MySqlTableSource(
+                        physicalSchema,
+                        port,
+                        hostname,
+                        database,
+                        tableName,
+                        username,
+                        password,
+                        serverTimeZone,
+                        dbzProperties,
+                        serverId,
+                        enableParallelRead,
+                        splitSize,
+                        splitMetaGroupSize,
+                        fetchSize,
+                        connectTimeout,
+                        connectMaxRetries,
+                        connectionPoolSize,
+                        distributionFactorUpper,
+                        distributionFactorLower,
+                        startupOptions,
+                        scanNewlyAddedTableEnabled,
+                        jdbcProperties,
+                        heartbeatInterval);
+        source.metadataKeys = metadataKeys;
+        source.producedDataType = producedDataType;
+        return source;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof MySqlTableSource)) {
+            return false;
+        }
+        MySqlTableSource that = (MySqlTableSource) o;
+        return port == that.port
+                && enableParallelRead == that.enableParallelRead
+                && splitSize == that.splitSize
+                && splitMetaGroupSize == that.splitMetaGroupSize
+                && fetchSize == that.fetchSize
+                && distributionFactorUpper == that.distributionFactorUpper
+                && distributionFactorLower == that.distributionFactorLower
+                && scanNewlyAddedTableEnabled == that.scanNewlyAddedTableEnabled
+                && Objects.equals(physicalSchema, that.physicalSchema)
+                && Objects.equals(hostname, that.hostname)
+                && Objects.equals(database, that.database)
+                && Objects.equals(username, that.username)
+                && Objects.equals(password, that.password)
+                && Objects.equals(serverId, that.serverId)
+                && Objects.equals(tableName, that.tableName)
+                && Objects.equals(serverTimeZone, that.serverTimeZone)
+                && Objects.equals(dbzProperties, that.dbzProperties)
+                && Objects.equals(connectTimeout, that.connectTimeout)
+                && Objects.equals(connectMaxRetries, that.connectMaxRetries)
+                && Objects.equals(connectionPoolSize, that.connectionPoolSize)
+                && Objects.equals(startupOptions, that.startupOptions)
+                && Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(jdbcProperties, that.jdbcProperties);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                physicalSchema,
+                port,
+                hostname,
+                database,
+                username,
+                password,
+                serverId,
+                tableName,
+                serverTimeZone,
+                dbzProperties,
+                enableParallelRead,
+                splitSize,
+                splitMetaGroupSize,
+                fetchSize,
+                connectTimeout,
+                connectMaxRetries,
+                connectionPoolSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                startupOptions,
+                producedDataType,
+                metadataKeys,
+                scanNewlyAddedTableEnabled,
+                jdbcProperties);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "MySQL-CDC";
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSourceFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSourceFactory.java
new file mode 100644
index 000000000..4ac6f90ab
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSourceFactory.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.ServerIdRange;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.DebeziumOptions;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_NAME;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.ObjectUtils.doubleCompare;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.CONNECT_TIMEOUT;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.HOSTNAME;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.PASSWORD;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.PORT;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_STARTUP_MODE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SERVER_ID;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SERVER_TIME_ZONE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.TABLE_NAME;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.USERNAME;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Factory for creating configured instance of {@link MySqlTableSource}. */
+public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
+
+    private static final String IDENTIFIER = "mysql-cdc";
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validateExcept(
+                DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JdbcUrlUtils.PROPERTIES_PREFIX);
+
+        final ReadableConfig config = helper.getOptions();
+        final String hostname = config.get(HOSTNAME);
+        final String username = config.get(USERNAME);
+        final String password = config.get(PASSWORD);
+        final String databaseName = config.get(DATABASE_NAME);
+        validateRegex(DATABASE_NAME.key(), databaseName);
+        final String tableName = config.get(TABLE_NAME);
+        validateRegex(TABLE_NAME.key(), tableName);
+        int port = config.get(PORT);
+        int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+        int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
+        int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
+        ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
+
+        ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
+        String serverId = validateAndGetServerId(config);
+        StartupOptions startupOptions = getStartupOptions(config);
+        Duration connectTimeout = config.get(CONNECT_TIMEOUT);
+        int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
+        int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
+        double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+        boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+        Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
+
+        boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+        if (enableParallelRead) {
+            validatePrimaryKeyIfEnableParallel(physicalSchema);
+            validateStartupOptionIfEnableParallel(startupOptions);
+            validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
+            validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
+            validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
+            validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
+            validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
+            validateDistributionFactorUpper(distributionFactorUpper);
+            validateDistributionFactorLower(distributionFactorLower);
+        }
+
+        return new MySqlTableSource(
+                physicalSchema,
+                port,
+                hostname,
+                databaseName,
+                tableName,
+                username,
+                password,
+                serverTimeZone,
+                getDebeziumProperties(context.getCatalogTable().getOptions()),
+                serverId,
+                enableParallelRead,
+                splitSize,
+                splitMetaGroupSize,
+                fetchSize,
+                connectTimeout,
+                connectMaxRetries,
+                connectionPoolSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                startupOptions,
+                scanNewlyAddedTableEnabled,
+                JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
+                heartbeatInterval);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(HOSTNAME);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        options.add(DATABASE_NAME);
+        options.add(TABLE_NAME);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PORT);
+        options.add(SERVER_TIME_ZONE);
+        options.add(SERVER_ID);
+        options.add(SCAN_STARTUP_MODE);
+        options.add(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+        options.add(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+        options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+        options.add(CHUNK_META_GROUP_SIZE);
+        options.add(SCAN_SNAPSHOT_FETCH_SIZE);
+        options.add(CONNECT_TIMEOUT);
+        options.add(CONNECTION_POOL_SIZE);
+        options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+        options.add(CONNECT_MAX_RETRIES);
+        options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+        options.add(HEARTBEAT_INTERVAL);
+        return options;
+    }
+
+    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+    private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
+    private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+    private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
+    private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
+
+    private static StartupOptions getStartupOptions(ReadableConfig config) {
+        String modeString = config.get(SCAN_STARTUP_MODE);
+
+        switch (modeString.toLowerCase()) {
+            case SCAN_STARTUP_MODE_VALUE_INITIAL:
+                return StartupOptions.initial();
+
+            case SCAN_STARTUP_MODE_VALUE_LATEST:
+                return StartupOptions.latest();
+
+            case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+            case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET:
+            case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
+                throw new ValidationException(
+                        String.format(
+                                "Unsupported option value '%s', the options [%s, %s, %s] "
+                                    + "are not supported correctly, "
+                                    + "please do not use them until they're correctly supported",
+                                modeString,
+                                SCAN_STARTUP_MODE_VALUE_EARLIEST,
+                                SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
+                                SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s",
+                                SCAN_STARTUP_MODE.key(),
+                                SCAN_STARTUP_MODE_VALUE_INITIAL,
+                                SCAN_STARTUP_MODE_VALUE_LATEST,
+                                modeString));
+        }
+    }
+
+    private void validatePrimaryKeyIfEnableParallel(ResolvedSchema physicalSchema) {
+        if (!physicalSchema.getPrimaryKey().isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "The primary key is necessary when enable '%s' to 'true'",
+                            SCAN_INCREMENTAL_SNAPSHOT_ENABLED));
+        }
+    }
+
+    private void validateStartupOptionIfEnableParallel(StartupOptions startupOptions) {
+        // validate mode
+        Preconditions.checkState(
+                startupOptions.startupMode == StartupMode.INITIAL
+                        || startupOptions.startupMode == StartupMode.LATEST_OFFSET,
+                String.format(
+                        "MySql Parallel Source only supports startup mode 'initial' and 'latest-offset',"
+                                + " but actual is %s",
+                        startupOptions.startupMode));
+    }
+
+    private String validateAndGetServerId(ReadableConfig configuration) {
+        final String serverIdValue = configuration.get(SERVER_ID);
+        if (serverIdValue != null) {
+            // validation
+            try {
+                ServerIdRange.from(serverIdValue);
+            } catch (Exception e) {
+                throw new ValidationException(
+                        String.format(
+                                "The value of option 'server-id' is invalid: '%s'", serverIdValue),
+                        e);
+            }
+        }
+        return serverIdValue;
+    }
+
+    /** Checks the value of given integer option is valid. */
+    private void validateIntegerOption(
+            ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
+        checkState(
+                optionValue > exclusiveMin,
+                String.format(
+                        "The value of option '%s' must larger than %d, but is %d",
+                        option.key(), exclusiveMin, optionValue));
+    }
+
+    /**
+     * Checks the given regular expression's syntax is valid.
+     *
+     * @param optionName the option name of the regex
+     * @param regex The regular expression to be checked
+     * @throws ValidationException If the expression's syntax is invalid
+     */
+    private void validateRegex(String optionName, String regex) {
+        try {
+            Pattern.compile(regex);
+        } catch (Exception e) {
+            throw new ValidationException(
+                    String.format(
+                            "The %s '%s' is not a valid regular expression", optionName, regex),
+                    e);
+        }
+    }
+
+    /** Checks the value of given evenly distribution factor upper bound is valid. */
+    private void validateDistributionFactorUpper(double distributionFactorUpper) {
+        checkState(
+                doubleCompare(distributionFactorUpper, 1.0d) >= 0,
+                String.format(
+                        "The value of option '%s' must larger than or equals %s, but is %s",
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
+                        1.0d,
+                        distributionFactorUpper));
+    }
+
+    /** Checks the value of given evenly distribution factor lower bound is valid. */
+    private void validateDistributionFactorLower(double distributionFactorLower) {
+        checkState(
+                doubleCompare(distributionFactorLower, 0.0d) >= 0
+                        && doubleCompare(distributionFactorLower, 1.0d) <= 0,
+                String.format(
+                        "The value of option '%s' must between %s and %s inclusively, but is %s",
+                        SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
+                        0.0d,
+                        1.0d,
+                        distributionFactorLower));
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupMode.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupMode.java
new file mode 100644
index 000000000..3ffc2bef3
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupMode.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+/**
+ * Startup modes for the MySQL CDC Consumer.
+ *
+ * @see StartupOptions
+ */
+public enum StartupMode {
+    INITIAL,
+
+    EARLIEST_OFFSET,
+
+    LATEST_OFFSET,
+
+    SPECIFIC_OFFSETS,
+
+    TIMESTAMP
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupOptions.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupOptions.java
new file mode 100644
index 000000000..67edcf744
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupOptions.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Debezium startup options. */
+public final class StartupOptions implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public final StartupMode startupMode;
+    public final String specificOffsetFile;
+    public final Integer specificOffsetPos;
+    public final Long startupTimestampMillis;
+
+    /**
+     * Performs an initial snapshot on the monitored database tables upon first startup, and
+     * continue to read the latest binlog.
+     */
+    public static StartupOptions initial() {
+        return new StartupOptions(StartupMode.INITIAL, null, null, null);
+    }
+
+    /**
+     * Never to perform snapshot on the monitored database tables upon first startup, just read from
+     * the beginning of the binlog. This should be used with care, as it is only valid when the
+     * binlog is guaranteed to contain the entire history of the database.
+     */
+    public static StartupOptions earliest() {
+        return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null, null);
+    }
+
+    /**
+     * Never to perform snapshot on the monitored database tables upon first startup, just read from
+     * the end of the binlog which means only have the changes since the connector was started.
+     */
+    public static StartupOptions latest() {
+        return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
+    }
+
+    /**
+     * Never to perform snapshot on the monitored database tables upon first startup, and directly
+     * read binlog from the specified offset.
+     */
+    public static StartupOptions specificOffset(String specificOffsetFile, int specificOffsetPos) {
+        return new StartupOptions(
+                StartupMode.SPECIFIC_OFFSETS, specificOffsetFile, specificOffsetPos, null);
+    }
+
+    /**
+     * Never to perform snapshot on the monitored database tables upon first startup, and directly
+     * read binlog from the specified timestamp.
+     *
+     * <p>The consumer will traverse the binlog from the beginning and ignore change events whose
+     * timestamp is smaller than the specified timestamp.
+     *
+     * @param startupTimestampMillis timestamp for the startup offsets, as milliseconds from epoch.
+     */
+    public static StartupOptions timestamp(long startupTimestampMillis) {
+        return new StartupOptions(StartupMode.TIMESTAMP, null, null, startupTimestampMillis);
+    }
+
+    private StartupOptions(
+            StartupMode startupMode,
+            String specificOffsetFile,
+            Integer specificOffsetPos,
+            Long startupTimestampMillis) {
+        this.startupMode = startupMode;
+        this.specificOffsetFile = specificOffsetFile;
+        this.specificOffsetPos = specificOffsetPos;
+        this.startupTimestampMillis = startupTimestampMillis;
+
+        switch (startupMode) {
+            case INITIAL:
+            case EARLIEST_OFFSET:
+            case LATEST_OFFSET:
+                break;
+            case SPECIFIC_OFFSETS:
+                checkNotNull(specificOffsetFile, "specificOffsetFile shouldn't be null");
+                checkNotNull(specificOffsetPos, "specificOffsetPos shouldn't be null");
+                break;
+            case TIMESTAMP:
+                checkNotNull(startupTimestampMillis, "startupTimestampMillis shouldn't be null");
+                break;
+            default:
+                throw new UnsupportedOperationException(startupMode + " mode is not supported.");
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        StartupOptions that = (StartupOptions) o;
+        return startupMode == that.startupMode
+                && Objects.equals(specificOffsetFile, that.specificOffsetFile)
+                && Objects.equals(specificOffsetPos, that.specificOffsetPos)
+                && Objects.equals(startupTimestampMillis, that.startupTimestampMillis);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                startupMode, specificOffsetFile, specificOffsetPos, startupTimestampMillis);
+    }
+}