You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/04/26 02:30:53 UTC

[incubator-inlong] branch master updated: [INLONG-3924][Sort] Add mysql cdc, support multiple meta data (#3924)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b09093a88 [INLONG-3924][Sort] Add mysql cdc, support multiple meta data (#3924)
b09093a88 is described below

commit b09093a88e6c7c359642a364fa30ed2db6a7a2e4
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Tue Apr 26 10:30:47 2022 +0800

    [INLONG-3924][Sort] Add mysql cdc, support multiple meta data (#3924)
---
 inlong-sort/sort-single-tenant/pom.xml             |  30 +
 .../debezium/DebeziumDeserializationSchema.java    |  42 ++
 .../flink/cdc/debezium/DebeziumSourceFunction.java | 576 ++++++++++++++++++
 .../JsonDebeziumDeserializationSchema.java         |  98 ++++
 .../StringDebeziumDeserializationSchema.java       |  49 ++
 .../singletenant/flink/cdc/debezium/Validator.java |  33 ++
 .../history/FlinkJsonTableChangeSerializer.java    | 207 +++++++
 .../debezium/internal/DebeziumChangeConsumer.java  | 103 ++++
 .../debezium/internal/DebeziumChangeFetcher.java   | 309 ++++++++++
 .../cdc/debezium/internal/DebeziumOffset.java      |  64 ++
 .../internal/DebeziumOffsetSerializer.java         |  41 ++
 .../debezium/internal/FlinkDatabaseHistory.java    | 116 ++++
 .../internal/FlinkDatabaseSchemaHistory.java       | 199 +++++++
 .../debezium/internal/FlinkOffsetBackingStore.java | 201 +++++++
 .../flink/cdc/debezium/internal/Handover.java      | 194 ++++++
 .../flink/cdc/debezium/internal/SchemaRecord.java  |  95 +++
 .../debezium/table/AppendMetadataCollector.java    |  63 ++
 .../flink/cdc/debezium/table/DebeziumOptions.java  |  53 ++
 .../table/DeserializationRuntimeConverter.java     |  31 +
 .../DeserializationRuntimeConverterFactory.java    |  47 ++
 .../cdc/debezium/table/MetadataConverter.java      |  36 ++
 .../table/RowDataDebeziumDeserializeSchema.java    | 651 +++++++++++++++++++++
 .../cdc/debezium/utils/DatabaseHistoryUtil.java    |  86 +++
 .../cdc/debezium/utils/TemporalConversions.java    | 216 +++++++
 pom.xml                                            |  32 +-
 25 files changed, 3571 insertions(+), 1 deletion(-)

diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index f1b831651..29eb8ead2 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -31,6 +31,11 @@
 
     <artifactId>sort-single-tenant</artifactId>
     <name>Apache InLong - Sort Single Tenant</name>
+    <properties>
+        <flink-connector-debezium.version>2.0.1</flink-connector-debezium.version>
+        <debezium-connector-mysql.version>1.5.4.Final</debezium-connector-mysql.version>
+        <debezium-core.version>1.5.4.Final</debezium-core.version>
+    </properties>
     <packaging>jar</packaging>
 
     <dependencies>
@@ -181,6 +186,31 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-debezium</artifactId>
+            <version>${flink-connector-debezium.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>kafka-log4j-appender</artifactId>
+                    <groupId>org.apache.kafka</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-mysql</artifactId>
+            <version>${debezium-connector-mysql.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-core</artifactId>
+            <version>${debezium-core.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java
new file mode 100644
index 000000000..a23fe10e3
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.io.Serializable;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * The deserialization schema describes how to turn the Debezium SourceRecord into data types
+ * (Java/Scala objects) that are processed by Flink.
+ *
+ * @param <T> The type created by the deserialization schema.
+ */
+@PublicEvolving
+public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+    /** Deserialize the Debezium record, it is represented in Kafka {@link SourceRecord}. */
+    void deserialize(SourceRecord record, Collector<T> out) throws Exception;
+    
+    void deserialize(SourceRecord record, Collector<T> out, TableChange tableChange) throws Exception;
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java
new file mode 100644
index 000000000..321b9ac14
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
+
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumChangeConsumer;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumChangeFetcher;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumOffset;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumOffsetSerializer;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkDatabaseHistory;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkOffsetBackingStore;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.Handover;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.SchemaRecord;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
+ * from databases into Flink.
+ *
+ * <p>There are two workers during the runtime. One worker periodically pulls records from the
+ * database and pushes the records into the {@link Handover}. The other worker consumes the records
+ * from the {@link Handover} and convert the records to the data in Flink style. The reason why
+ * don't use one workers is because debezium has different behaviours in snapshot phase and
+ * streaming phase.
+ *
+ * <p>Here we use the {@link Handover} as the buffer to submit data from the producer to the
+ * consumer. Because the two threads don't communicate to each other directly, the error reporting
+ * also relies on {@link Handover}. When the engine gets errors, the engine uses the {@link
+ * DebeziumEngine.CompletionCallback} to report errors to the {@link Handover} and wakes up the
+ * consumer to check the error. However, the source function just closes the engine and wakes up the
+ * producer if the error is from the Flink side.
+ *
+ * <p>If the execution is canceled or finish(only snapshot phase), the exit logic is as same as the
+ * logic in the error reporting.
+ *
+ * <p>The source function participates in checkpointing and guarantees that no data is lost during a
+ * failure, and that the computation processes elements "exactly once".
+ *
+ * <p>Note: currently, the source function can't run in multiple parallel instances.
+ *
+ * <p>Please refer to Debezium's documentation for the available configuration properties:
+ * https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties
+ */
+@PublicEvolving
+public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
+        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
+
+    private static final long serialVersionUID = -5808108641062931623L;
+
+    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
+    /** State name of the consumer's partition offset states. */
+    public static final String OFFSETS_STATE_NAME = "offset-states";
+
+    /** State name of the consumer's history records state. */
+    public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
+    /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
+    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+    /**
+     * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
+     * not.
+     */
+    public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
+    /** The configuration value represents legacy implementation. */
+    public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+
+    // ---------------------------------------------------------------------------------------
+    // Properties
+    // ---------------------------------------------------------------------------------------
+
+    /** The schema to convert from Debezium's messages into Flink's objects. */
+    private final DebeziumDeserializationSchema<T> deserializer;
+
+    /** User-supplied properties for Kafka. * */
+    private final Properties properties;
+
+    /** The specific binlog offset to read from when the first startup. */
+    private final @Nullable DebeziumOffset specificOffset;
+
+    /** Data for pending but uncommitted offsets. */
+    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+    /** Flag indicating whether the Debezium Engine is started. */
+    private volatile boolean debeziumStarted = false;
+
+    /** Validator to validate the connected database satisfies the cdc connector's requirements. */
+    private final Validator validator;
+
+    // ---------------------------------------------------------------------------------------
+    // State
+    // ---------------------------------------------------------------------------------------
+
+    /**
+     * The offsets to restore to, if the consumer restores state from a checkpoint.
+     *
+     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+     * method.
+     *
+     * <p>Using a String because we are encoding the offset state in JSON bytes.
+     */
+    private transient volatile String restoredOffsetState;
+
+    /** Accessor for state in the operator state backend. */
+    private transient ListState<byte[]> offsetState;
+
+    /**
+     * State to store the history records, i.e. schema changes.
+     *
+     * @see FlinkDatabaseHistory
+     * @see FlinkDatabaseSchemaHistory
+     */
+    private transient ListState<String> schemaRecordsState;
+
+    // ---------------------------------------------------------------------------------------
+    // Worker
+    // ---------------------------------------------------------------------------------------
+
+    private transient ExecutorService executor;
+    private transient DebeziumEngine<?> engine;
+    /**
+     * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
+     * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
+     */
+    private transient String engineInstanceName;
+
+    /** Consume the events from the engine and commit the offset to the engine. */
+    private transient DebeziumChangeConsumer changeConsumer;
+
+    /** The consumer to fetch records from {@link Handover}. */
+    private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
+
+    /** Buffer the events from the source and record the errors from the debezium. */
+    private transient Handover handover;
+
+    // ---------------------------------------------------------------------------------------
+
+    public DebeziumSourceFunction(
+            DebeziumDeserializationSchema<T> deserializer,
+            Properties properties,
+            @Nullable DebeziumOffset specificOffset,
+            Validator validator) {
+        this.deserializer = deserializer;
+        this.properties = properties;
+        this.specificOffset = specificOffset;
+        this.validator = validator;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        validator.validate();
+        super.open(parameters);
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
+        this.executor = Executors.newSingleThreadExecutor(threadFactory);
+        this.handover = new Handover();
+        this.changeConsumer = new DebeziumChangeConsumer(handover);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint and restore
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+        this.offsetState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                OFFSETS_STATE_NAME,
+                                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+        this.schemaRecordsState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
+
+        if (context.isRestored()) {
+            restoreOffsetState();
+            restoreHistoryRecordsState();
+        } else {
+            if (specificOffset != null) {
+                byte[] serializedOffset =
+                        DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+                LOG.info(
+                        "Consumer subtask {} starts to read from specified offset {}.",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        restoredOffsetState);
+            } else {
+                LOG.info(
+                        "Consumer subtask {} has no restore state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+    }
+
+    private void restoreOffsetState() throws Exception {
+        for (byte[] serializedOffset : offsetState.get()) {
+            if (restoredOffsetState == null) {
+                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+            } else {
+                throw new RuntimeException(
+                        "Debezium Source only support single task, "
+                                + "however, this is restored from multiple tasks.");
+            }
+        }
+        LOG.info(
+                "Consumer subtask {} restored offset state: {}.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                restoredOffsetState);
+    }
+
+    private void restoreHistoryRecordsState() throws Exception {
+        DocumentReader reader = DocumentReader.defaultReader();
+        ConcurrentLinkedQueue<SchemaRecord> historyRecords = new ConcurrentLinkedQueue<>();
+        int recordsCount = 0;
+        boolean firstEntry = true;
+        for (String record : schemaRecordsState.get()) {
+            if (firstEntry) {
+                // we store the engine instance name in the first element
+                this.engineInstanceName = record;
+                firstEntry = false;
+            } else {
+                // Put the records into the state. The database history should read, reorganize and
+                // register the state.
+                historyRecords.add(new SchemaRecord(reader.read(record)));
+                recordsCount++;
+            }
+        }
+        if (engineInstanceName != null) {
+            registerHistory(engineInstanceName, historyRecords);
+        }
+        LOG.info(
+                "Consumer subtask {} restored history records state: {} with {} records.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                engineInstanceName,
+                recordsCount);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        if (handover.hasError()) {
+            LOG.debug("snapshotState() called on closed source");
+            throw new FlinkRuntimeException(
+                    "Call snapshotState() on closed source, checkpoint failed.");
+        } else {
+            snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+            snapshotHistoryRecordsState();
+        }
+    }
+
+    private void snapshotOffsetState(long checkpointId) throws Exception {
+        offsetState.clear();
+
+        final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
+
+        byte[] serializedOffset = null;
+        if (fetcher == null) {
+            // the fetcher has not yet been initialized, which means we need to return the
+            // originally restored offsets
+            if (restoredOffsetState != null) {
+                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            }
+        } else {
+            byte[] currentState = fetcher.snapshotCurrentState();
+            if (currentState == null && restoredOffsetState != null) {
+                // the fetcher has been initialized, but has not yet received any data,
+                // which means we need to return the originally restored offsets.
+                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            } else {
+                serializedOffset = currentState;
+            }
+        }
+
+        if (serializedOffset != null) {
+            offsetState.add(serializedOffset);
+            // the map cannot be asynchronously updated, because only one checkpoint call
+            // can happen on this function at a time: either snapshotState() or
+            // notifyCheckpointComplete()
+            pendingOffsetsToCommit.put(checkpointId, serializedOffset);
+            // truncate the map of pending offsets to commit, to prevent infinite growth
+            while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+                pendingOffsetsToCommit.remove(0);
+            }
+        }
+    }
+
+    private void snapshotHistoryRecordsState() throws Exception {
+        schemaRecordsState.clear();
+
+        if (engineInstanceName != null) {
+            schemaRecordsState.add(engineInstanceName);
+            Collection<SchemaRecord> records = retrieveHistory(engineInstanceName);
+            DocumentWriter writer = DocumentWriter.defaultWriter();
+            for (SchemaRecord record : records) {
+                schemaRecordsState.add(writer.write(record.toDocument()));
+            }
+        }
+    }
+
+    @Override
+    public void run(SourceContext<T> sourceContext) throws Exception {
+        properties.setProperty("name", "engine");
+        properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
+        if (restoredOffsetState != null) {
+            // restored from state
+            properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
+        }
+        // DO NOT include schema change, e.g. DDL
+        properties.setProperty("include.schema.changes", "false");
+        // disable the offset flush totally
+        properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
+        // disable tombstones
+        properties.setProperty("tombstones.on.delete", "false");
+        if (engineInstanceName == null) {
+            // not restore from recovery
+            engineInstanceName = UUID.randomUUID().toString();
+        }
+        // history instance name to initialize FlinkDatabaseHistory
+        properties.setProperty(
+                FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
+        // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't
+        // continue to read binlog
+        // see
+        // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
+        // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
+        properties.setProperty("database.history", determineDatabase().getCanonicalName());
+
+        // we have to filter out the heartbeat events, otherwise the deserializer will fail
+        String dbzHeartbeatPrefix =
+                properties.getProperty(
+                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
+                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
+        this.debeziumChangeFetcher =
+                new DebeziumChangeFetcher<>(
+                        sourceContext,
+                        deserializer,
+                        restoredOffsetState == null, // DB snapshot phase if restore state is null
+                        dbzHeartbeatPrefix,
+                        handover);
+
+        // create the engine with this configuration ...
+        this.engine =
+                DebeziumEngine.create(Connect.class)
+                        .using(properties)
+                        .notifying(changeConsumer)
+                        .using(OffsetCommitPolicy.always())
+                        .using(
+                                (success, message, error) -> {
+                                    if (success) {
+                                        // Close the handover and prepare to exit.
+                                        handover.close();
+                                    } else {
+                                        handover.reportError(error);
+                                    }
+                                })
+                        .build();
+
+        // run the engine asynchronously
+        executor.execute(engine);
+        debeziumStarted = true;
+
+        // initialize metrics
+        // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
+        final Method getMetricGroupMethod =
+                getRuntimeContext().getClass().getMethod("getMetricGroup");
+        getMetricGroupMethod.setAccessible(true);
+        final MetricGroup metricGroup =
+                (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+        metricGroup.gauge(
+                "currentFetchEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+        metricGroup.gauge(
+                "currentEmitEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+        metricGroup.gauge(
+                "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
+
+        // start the real debezium consumer
+        debeziumChangeFetcher.runFetchLoop();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        if (!debeziumStarted) {
+            LOG.debug("notifyCheckpointComplete() called when engine is not started.");
+            return;
+        }
+
+        final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher;
+        if (fetcher == null) {
+            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+            return;
+        }
+
+        try {
+            final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+            if (posInMap == -1) {
+                LOG.warn(
+                        "Consumer subtask {} received confirmation for unknown checkpoint id {}",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        checkpointId);
+                return;
+            }
+
+            byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap);
+
+            // remove older checkpoints in map
+            for (int i = 0; i < posInMap; i++) {
+                pendingOffsetsToCommit.remove(0);
+            }
+
+            if (serializedOffsets == null || serializedOffsets.length == 0) {
+                LOG.debug(
+                        "Consumer subtask {} has empty checkpoint state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+                return;
+            }
+
+            DebeziumOffset offset =
+                    DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
+            changeConsumer.commitOffset(offset);
+        } catch (Exception e) {
+            // ignore exception if we are no longer running
+            LOG.warn("Ignore error when committing offset to database.", e);
+        }
+    }
+
+    @Override
+    public void cancel() {
+        // safely and gracefully stop the engine
+        shutdownEngine();
+        if (debeziumChangeFetcher != null) {
+            debeziumChangeFetcher.close();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        cancel();
+
+        if (executor != null) {
+            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        }
+
+        super.close();
+    }
+
+    /** Safely and gracefully stop the Debezium engine. */
+    private void shutdownEngine() {
+        try {
+            if (engine != null) {
+                engine.close();
+            }
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        } finally {
+            if (executor != null) {
+                executor.shutdownNow();
+            }
+
+            debeziumStarted = false;
+
+            if (handover != null) {
+                handover.close();
+            }
+        }
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializer.getProducedType();
+    }
+
+    @VisibleForTesting
+    public LinkedMap getPendingOffsetsToCommit() {
+        return pendingOffsetsToCommit;
+    }
+
+    @VisibleForTesting
+    public boolean getDebeziumStarted() {
+        return debeziumStarted;
+    }
+
+    private Class<?> determineDatabase() {
+        boolean isCompatibleWithLegacy =
+                FlinkDatabaseHistory.isCompatible(retrieveHistory(engineInstanceName));
+        if (LEGACY_IMPLEMENTATION_VALUE.equals(properties.get(LEGACY_IMPLEMENTATION_KEY))) {
+            // specifies the legacy implementation but the state may be incompatible
+            if (isCompatibleWithLegacy) {
+                return FlinkDatabaseHistory.class;
+            } else {
+                throw new IllegalStateException(
+                        "The configured option 'debezium.internal.implementation' is 'legacy', "
+                            + "but the state of source is incompatible with this implementation, "
+                            + "you should remove the the option.");
+            }
+        } else if (FlinkDatabaseSchemaHistory.isCompatible(retrieveHistory(engineInstanceName))) {
+            // tries the non-legacy first
+            return FlinkDatabaseSchemaHistory.class;
+        } else if (isCompatibleWithLegacy) {
+            // fallback to legacy if possible
+            return FlinkDatabaseHistory.class;
+        } else {
+            // impossible
+            throw new IllegalStateException("Can't determine which DatabaseHistory to use.");
+        }
+    }
+
+    @VisibleForTesting
+    public String getEngineInstanceName() {
+        return engineInstanceName;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java
new file mode 100644
index 000000000..1539a3570
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+
+/**
+ * A JSON format implementation of {@link DebeziumDeserializationSchema} which deserializes the
+ * received {@link SourceRecord} to JSON String.
+ */
+public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient JsonConverter jsonConverter;
+
+    /**
+     * Configuration whether to enable {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG} to include
+     * schema in messages.
+     */
+    private final Boolean includeSchema;
+
+    /** The custom configurations for {@link JsonConverter}. */
+    private Map<String, Object> customConverterConfigs;
+
+    public JsonDebeziumDeserializationSchema() {
+        this(false);
+    }
+
+    public JsonDebeziumDeserializationSchema(Boolean includeSchema) {
+        this.includeSchema = includeSchema;
+    }
+
+    public JsonDebeziumDeserializationSchema(
+            Boolean includeSchema, Map<String, Object> customConverterConfigs) {
+        this.includeSchema = includeSchema;
+        this.customConverterConfigs = customConverterConfigs;
+    }
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
+        if (jsonConverter == null) {
+            initializeJsonConverter();
+        }
+        byte[] bytes =
+                jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
+        out.collect(new String(bytes));
+    }
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<String> out, TableChange tableChange)
+        throws Exception {
+
+    }
+
+    /** Initialize {@link JsonConverter} with given configs. */
+    private void initializeJsonConverter() {
+        jsonConverter = new JsonConverter();
+        final HashMap<String, Object> configs = new HashMap<>(2);
+        configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
+        configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
+        if (customConverterConfigs != null) {
+            configs.putAll(customConverterConfigs);
+        }
+        jsonConverter.configure(configs);
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return BasicTypeInfo.STRING_TYPE_INFO;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java
new file mode 100644
index 000000000..1b7931abd
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * A simple implementation of {@link DebeziumDeserializationSchema} which converts the received
+ * {@link SourceRecord} into String.
+ */
+public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
+    private static final long serialVersionUID = -3168848963265670603L;
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
+        out.collect(record.toString());
+    }
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<String> out, TableChange tableChange)
+        throws Exception {
+
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return BasicTypeInfo.STRING_TYPE_INFO;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/Validator.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/Validator.java
new file mode 100644
index 000000000..f39147673
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/Validator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
+
+import java.io.Serializable;
+
+/** Validator to validate the connected database satisfies the cdc connector's requirements. */
+public interface Validator extends Serializable {
+
+    void validate();
+
+    static Validator getDefaultValidator() {
+        return () -> {
+            // do nothing;
+        };
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java
new file mode 100644
index 000000000..856d7fa83
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.history;
+
+import io.debezium.document.Array;
+import io.debezium.document.Array.Entry;
+import io.debezium.document.Document;
+import io.debezium.document.Value;
+import io.debezium.relational.Column;
+import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import io.debezium.relational.history.TableChanges.TableChangeType;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * The serializer responsible for converting of {@link TableChanges} into a JSON format. Copied from
+ * io.debezium.relational.history.JsonTableChangeSerializer, but add serialization/deserialization
+ * for column's enumValues
+ */
+public class FlinkJsonTableChangeSerializer implements TableChanges.TableChangesSerializer<Array> {
+
+    @Override
+    public Array serialize(TableChanges tableChanges) {
+        List<Value> values =
+                StreamSupport.stream(tableChanges.spliterator(), false)
+                        .map(this::toDocument)
+                        .map(Value::create)
+                        .collect(Collectors.toList());
+
+        return Array.create(values);
+    }
+
+    public Document toDocument(TableChange tableChange) {
+        Document document = Document.create();
+
+        document.setString("type", tableChange.getType().name());
+        document.setString("id", tableChange.getId().toDoubleQuotedString());
+        document.setDocument("table", toDocument(tableChange.getTable()));
+        return document;
+    }
+
+    private Document toDocument(Table table) {
+        Document document = Document.create();
+
+        document.set("defaultCharsetName", table.defaultCharsetName());
+        document.set("primaryKeyColumnNames", Array.create(table.primaryKeyColumnNames()));
+
+        List<Document> columns =
+                table.columns().stream().map(this::toDocument).collect(Collectors.toList());
+
+        document.setArray("columns", Array.create(columns));
+
+        return document;
+    }
+
+    private Document toDocument(Column column) {
+        Document document = Document.create();
+
+        document.setString("name", column.name());
+        document.setNumber("jdbcType", column.jdbcType());
+
+        if (column.nativeType() != Column.UNSET_INT_VALUE) {
+            document.setNumber("nativeType", column.nativeType());
+        }
+
+        document.setString("typeName", column.typeName());
+        document.setString("typeExpression", column.typeExpression());
+        document.setString("charsetName", column.charsetName());
+
+        if (column.length() != Column.UNSET_INT_VALUE) {
+            document.setNumber("length", column.length());
+        }
+
+        column.scale().ifPresent(s -> document.setNumber("scale", s));
+
+        document.setNumber("position", column.position());
+        document.setBoolean("optional", column.isOptional());
+        document.setBoolean("autoIncremented", column.isAutoIncremented());
+        document.setBoolean("generated", column.isGenerated());
+
+        // BEGIN FLINK MODIFICATION
+        document.setArray("enumValues", column.enumValues().toArray());
+        // END FLINK MODIFICATION
+
+        return document;
+    }
+
+    @Override
+    public TableChanges deserialize(Array array, boolean useCatalogBeforeSchema) {
+        TableChanges tableChanges = new TableChanges();
+
+        for (Entry entry : array) {
+            TableChange change =
+                    fromDocument(entry.getValue().asDocument(), useCatalogBeforeSchema);
+
+            if (change.getType() == TableChangeType.CREATE) {
+                tableChanges.create(change.getTable());
+            } else if (change.getType() == TableChangeType.ALTER) {
+                tableChanges.alter(change.getTable());
+            } else if (change.getType() == TableChangeType.DROP) {
+                tableChanges.drop(change.getTable());
+            }
+        }
+
+        return tableChanges;
+    }
+
+    private static Table fromDocument(TableId id, Document document) {
+        TableEditor editor =
+                Table.editor()
+                        .tableId(id)
+                        .setDefaultCharsetName(document.getString("defaultCharsetName"));
+
+        document.getArray("columns")
+                .streamValues()
+                .map(Value::asDocument)
+                .map(
+                        v -> {
+                            ColumnEditor columnEditor =
+                                    Column.editor()
+                                            .name(v.getString("name"))
+                                            .jdbcType(v.getInteger("jdbcType"));
+
+                            Integer nativeType = v.getInteger("nativeType");
+                            if (nativeType != null) {
+                                columnEditor.nativeType(nativeType);
+                            }
+
+                            columnEditor
+                                    .type(v.getString("typeName"), v.getString("typeExpression"))
+                                    .charsetName(v.getString("charsetName"));
+
+                            Integer length = v.getInteger("length");
+                            if (length != null) {
+                                columnEditor.length(length);
+                            }
+
+                            Integer scale = v.getInteger("scale");
+                            if (scale != null) {
+                                columnEditor.scale(scale);
+                            }
+
+                            columnEditor
+                                    .position(v.getInteger("position"))
+                                    .optional(v.getBoolean("optional"))
+                                    .autoIncremented(v.getBoolean("autoIncremented"))
+                                    .generated(v.getBoolean("generated"));
+
+                            // BEGIN FLINK MODIFICATION
+                            Array enumValues = v.getArray("enumValues");
+                            if (enumValues != null && !enumValues.isEmpty()) {
+                                columnEditor.enumValues(
+                                        enumValues
+                                                .streamValues()
+                                                .map(Value::asString)
+                                                .collect(Collectors.toList()));
+                            }
+                            // END FLINK MODIFICATION
+
+                            return columnEditor.create();
+                        })
+                .forEach(editor::addColumn);
+
+        editor.setPrimaryKeyNames(
+                document.getArray("primaryKeyColumnNames")
+                        .streamValues()
+                        .map(Value::asString)
+                        .collect(Collectors.toList()));
+
+        return editor.create();
+    }
+
+    public static TableChange fromDocument(Document document, boolean useCatalogBeforeSchema) {
+        TableChangeType type = TableChangeType.valueOf(document.getString("type"));
+        TableId id = TableId.parse(document.getString("id"), useCatalogBeforeSchema);
+        Table table = null;
+
+        if (type == TableChangeType.CREATE || type == TableChangeType.ALTER) {
+            table = fromDocument(id, document.getDocument("table"));
+        } else {
+            table = Table.editor().tableId(id).create();
+        }
+        return new TableChange(type, table);
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java
new file mode 100644
index 000000000..bb31eadec
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import io.debezium.embedded.EmbeddedEngineChangeEvent;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.DebeziumEngine.RecordCommitter;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Consume debezium change events. */
+@Internal
+public class DebeziumChangeConsumer
+        implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
+    public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
+    public static final String LAST_COMMIT_LSN_KEY = "lsn_commit";
+    private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
+
+    private final Handover handover;
+    // keep the modification is visible to the source function
+    private volatile RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> currentCommitter;
+
+    public DebeziumChangeConsumer(Handover handover) {
+        this.handover = handover;
+    }
+
+    @Override
+    public void handleBatch(
+            List<ChangeEvent<SourceRecord, SourceRecord>> events,
+            RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
+        try {
+            currentCommitter = recordCommitter;
+            handover.produce(events);
+        } catch (Throwable e) {
+            // Hold this exception in handover and trigger the fetcher to exit
+            handover.reportError(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void commitOffset(DebeziumOffset offset) throws InterruptedException {
+        // Although the committer is read/write by multi-thread, the committer will be not changed
+        // frequently.
+        if (currentCommitter == null) {
+            LOG.info(
+                    "commitOffset() called on Debezium change consumer which doesn't receive records yet.");
+            return;
+        }
+
+        // only the offset is used
+        SourceRecord recordWrapper =
+                new SourceRecord(
+                        offset.sourcePartition,
+                        adjustSourceOffset((Map<String, Object>) offset.sourceOffset),
+                        "DUMMY",
+                        Schema.BOOLEAN_SCHEMA,
+                        true);
+        EmbeddedEngineChangeEvent<SourceRecord, SourceRecord> changeEvent =
+                new EmbeddedEngineChangeEvent<>(null, recordWrapper, recordWrapper);
+        currentCommitter.markProcessed(changeEvent);
+        currentCommitter.markBatchFinished();
+    }
+
+    /**
+     * We have to adjust type of LSN values to Long, because it might be Integer after
+     * deserialization, however {@code
+     * io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(java.util.Map)}
+     * requires Long.
+     */
+    private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset) {
+        if (sourceOffset.containsKey(LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
+            String value = sourceOffset.get(LAST_COMPLETELY_PROCESSED_LSN_KEY).toString();
+            sourceOffset.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value));
+        }
+        if (sourceOffset.containsKey(LAST_COMMIT_LSN_KEY)) {
+            String value = sourceOffset.get(LAST_COMMIT_LSN_KEY).toString();
+            sourceOffset.put(LAST_COMMIT_LSN_KEY, Long.parseLong(value));
+        }
+        return sourceOffset;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java
new file mode 100644
index 000000000..ea44e7026
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.data.Envelope;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Handler that convert change messages from {@link DebeziumEngine} to data in Flink. Considering
+ * Debezium in different mode has different strategies to hold the lock, e.g. snapshot, the handler
+ * also needs different strategy. In snapshot phase, the handler needs to hold the lock until the
+ * snapshot finishes. But in non-snapshot phase, the handler only needs to hold the lock when
+ * emitting the records.
+ *
+ * @param <T> The type of elements produced by the handler.
+ */
+@Internal
+public class DebeziumChangeFetcher<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeFetcher.class);
+
+    private final SourceFunction.SourceContext<T> sourceContext;
+
+    /**
+     * The lock that guarantees that record emission and state updates are atomic, from the view of
+     * taking a checkpoint.
+     */
+    private final Object checkpointLock;
+
+    /** The schema to convert from Debezium's messages into Flink's objects. */
+    private final DebeziumDeserializationSchema<T> deserialization;
+
+    /** A collector to emit records in batch (bundle). */
+    private final DebeziumCollector debeziumCollector;
+
+    private final DebeziumOffset debeziumOffset;
+
+    private final DebeziumOffsetSerializer stateSerializer;
+
+    private final String heartbeatTopicPrefix;
+
+    private boolean isInDbSnapshotPhase;
+
+    private final Handover handover;
+
+    private volatile boolean isRunning = true;
+
+    // ---------------------------------------------------------------------------------------
+    // Metrics
+    // ---------------------------------------------------------------------------------------
+
+    /** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */
+    private volatile long messageTimestamp = 0L;
+
+    /** The last record processing time. */
+    private volatile long processTime = 0L;
+
+    /**
+     * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
+     * record fetched into the source operator.
+     */
+    private volatile long fetchDelay = 0L;
+
+    /**
+     * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
+     * source operator.
+     */
+    private volatile long emitDelay = 0L;
+
+    // ------------------------------------------------------------------------
+
+    public DebeziumChangeFetcher(
+            SourceFunction.SourceContext<T> sourceContext,
+            DebeziumDeserializationSchema<T> deserialization,
+            boolean isInDbSnapshotPhase,
+            String heartbeatTopicPrefix,
+            Handover handover) {
+        this.sourceContext = sourceContext;
+        this.checkpointLock = sourceContext.getCheckpointLock();
+        this.deserialization = deserialization;
+        this.isInDbSnapshotPhase = isInDbSnapshotPhase;
+        this.heartbeatTopicPrefix = heartbeatTopicPrefix;
+        this.debeziumCollector = new DebeziumCollector();
+        this.debeziumOffset = new DebeziumOffset();
+        this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
+        this.handover = handover;
+    }
+
+    /**
+     * Take a snapshot of the Debezium handler state.
+     *
+     * <p>Important: This method must be called under the checkpoint lock.
+     */
+    public byte[] snapshotCurrentState() throws Exception {
+        // this method assumes that the checkpoint lock is held
+        assert Thread.holdsLock(checkpointLock);
+        if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
+            return null;
+        }
+
+        return stateSerializer.serialize(debeziumOffset);
+    }
+
+    /**
+     * Process change messages from the {@link Handover} and collect the processed messages by
+     * {@link Collector}.
+     */
+    public void runFetchLoop() throws Exception {
+        try {
+            // begin snapshot database phase
+            if (isInDbSnapshotPhase) {
+                List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
+
+                synchronized (checkpointLock) {
+                    LOG.info(
+                            "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
+                    handleBatch(events);
+                    while (isRunning && isInDbSnapshotPhase) {
+                        handleBatch(handover.pollNext());
+                    }
+                }
+                LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
+            }
+
+            // begin streaming binlog phase
+            while (isRunning) {
+                // If the handover is closed or has errors, exit.
+                // If there is no streaming phase, the handover will be closed by the engine.
+                handleBatch(handover.pollNext());
+            }
+        } catch (Handover.ClosedException e) {
+            // ignore
+        }
+    }
+
+    public void close() {
+        isRunning = false;
+        handover.close();
+    }
+
+    // ---------------------------------------------------------------------------------------
+    // Metric getter
+    // ---------------------------------------------------------------------------------------
+
+    /**
+     * The metric indicates delay from data generation to entry into the system.
+     *
+     * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is
+     * unavailable.
+     */
+    public long getFetchDelay() {
+        return fetchDelay;
+    }
+
+    /**
+     * The metric indicates delay from data generation to leaving the source operator.
+     *
+     * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is
+     * unavailable.
+     */
+    public long getEmitDelay() {
+        return emitDelay;
+    }
+
+    public long getIdleTime() {
+        return System.currentTimeMillis() - processTime;
+    }
+
+    // ---------------------------------------------------------------------------------------
+    // Helper
+    // ---------------------------------------------------------------------------------------
+
+    private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents)
+            throws Exception {
+        if (CollectionUtils.isEmpty(changeEvents)) {
+            return;
+        }
+        this.processTime = System.currentTimeMillis();
+
+        for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
+            SourceRecord record = event.value();
+            updateMessageTimestamp(record);
+            fetchDelay = isInDbSnapshotPhase ? 0L : processTime - messageTimestamp;
+
+            if (isHeartbeatEvent(record)) {
+                // keep offset update
+                synchronized (checkpointLock) {
+                    debeziumOffset.setSourcePartition(record.sourcePartition());
+                    debeziumOffset.setSourceOffset(record.sourceOffset());
+                }
+                // drop heartbeat events
+                continue;
+            }
+
+            deserialization.deserialize(record, debeziumCollector);
+
+            if (!isSnapshotRecord(record)) {
+                LOG.debug("Snapshot phase finishes.");
+                isInDbSnapshotPhase = false;
+            }
+
+            // emit the actual records. this also updates offset state atomically
+            emitRecordsUnderCheckpointLock(
+                    debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
+        }
+    }
+
+    private void emitRecordsUnderCheckpointLock(
+            Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
+        // Emit the records. Use the checkpoint lock to guarantee
+        // atomicity of record emission and offset state update.
+        // The synchronized checkpointLock is reentrant. It's safe to sync again in snapshot mode.
+        synchronized (checkpointLock) {
+            T record;
+            while ((record = records.poll()) != null) {
+                emitDelay =
+                        isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - messageTimestamp;
+                sourceContext.collect(record);
+            }
+            // update offset to state
+            debeziumOffset.setSourcePartition(sourcePartition);
+            debeziumOffset.setSourceOffset(sourceOffset);
+        }
+    }
+
+    private void updateMessageTimestamp(SourceRecord record) {
+        Schema schema = record.valueSchema();
+        Struct value = (Struct) record.value();
+        if (schema.field(Envelope.FieldName.SOURCE) == null) {
+            return;
+        }
+
+        Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+        if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
+            return;
+        }
+
+        Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
+        if (tsMs != null) {
+            this.messageTimestamp = tsMs;
+        }
+    }
+
+    private boolean isHeartbeatEvent(SourceRecord record) {
+        String topic = record.topic();
+        return topic != null && topic.startsWith(heartbeatTopicPrefix);
+    }
+
+    private boolean isSnapshotRecord(SourceRecord record) {
+        Struct value = (Struct) record.value();
+        if (value != null) {
+            Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+            SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
+            // even if it is the last record of snapshot, i.e. SnapshotRecord.LAST
+            // we can still recover from checkpoint and continue to read the binlog,
+            // because the checkpoint contains binlog position
+            return SnapshotRecord.TRUE == snapshotRecord;
+        }
+        return false;
+    }
+
+    // ---------------------------------------------------------------------------------------
+
+    private class DebeziumCollector implements Collector<T> {
+
+        private final Queue<T> records = new ArrayDeque<>();
+
+        @Override
+        public void collect(T record) {
+            records.add(record);
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java
new file mode 100644
index 000000000..a51aad8d4
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The state that the Flink Debezium Consumer holds for each instance.
+ *
+ * <p>This class describes the most basic state that Debezium used for recovering based on Kafka
+ * Connect mechanism. It includes a sourcePartition and sourceOffset.
+ *
+ * <p>The sourcePartition represents a single input sourcePartition that the record came from (e.g.
+ * a filename, table name, or topic-partition). The sourceOffset represents a position in that
+ * sourcePartition which can be used to resume consumption of data.
+ *
+ * <p>These values can have arbitrary structure and should be represented using
+ * org.apache.kafka.connect.data objects (or primitive values). For example, a database connector
+ * might specify the sourcePartition as a record containing { "db": "database_name", "table":
+ * "table_name"} and the sourceOffset as a Long containing the timestamp of the row.
+ */
+@Internal
+public class DebeziumOffset implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public Map<String, ?> sourcePartition;
+    public Map<String, ?> sourceOffset;
+
+    public void setSourcePartition(Map<String, ?> sourcePartition) {
+        this.sourcePartition = sourcePartition;
+    }
+
+    public void setSourceOffset(Map<String, ?> sourceOffset) {
+        this.sourceOffset = sourceOffset;
+    }
+
+    @Override
+    public String toString() {
+        return "DebeziumOffset{"
+                + "sourcePartition="
+                + sourcePartition
+                + ", sourceOffset="
+                + sourceOffset
+                + '}';
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java
new file mode 100644
index 000000000..9f193a8d9
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import java.io.IOException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/** Serializer implementation for a {@link DebeziumOffset}. */
+@Internal
+public class DebeziumOffsetSerializer {
+    public static final DebeziumOffsetSerializer INSTANCE = new DebeziumOffsetSerializer();
+
+    public byte[] serialize(DebeziumOffset debeziumOffset) throws IOException {
+        // we currently use JSON serialization for simplification, as the state is very small.
+        // we can improve this in the future if needed
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.writeValueAsBytes(debeziumOffset);
+    }
+
+    public DebeziumOffset deserialize(byte[] bytes) throws IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.readValue(bytes, DebeziumOffset.class);
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java
new file mode 100644
index 000000000..77315db48
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.removeHistory;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+
+/**
+ * Inspired from {@link io.debezium.relational.history.MemoryDatabaseHistory} but we will store the
+ * HistoryRecords in Flink's state for persistence.
+ *
+ * <p>Note: This is not a clean solution because we depends on a global variable and all the history
+ * records will be stored in state (grow infinitely). We may need to come up with a
+ * FileSystemDatabaseHistory in the future to store history in HDFS.
+ */
+public class FlinkDatabaseHistory extends AbstractDatabaseHistory {
+
+    public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";
+
+    private ConcurrentLinkedQueue<SchemaRecord> schemaRecords;
+    private String instanceName;
+
+    /** Gets the registered HistoryRecords under the given instance name. */
+    private ConcurrentLinkedQueue<SchemaRecord> getRegisteredHistoryRecord(String instanceName) {
+        Collection<SchemaRecord> historyRecords = retrieveHistory(instanceName);
+        return new ConcurrentLinkedQueue<>(historyRecords);
+    }
+
+    @Override
+    public void configure(
+            Configuration config,
+            HistoryRecordComparator comparator,
+            DatabaseHistoryListener listener,
+            boolean useCatalogBeforeSchema) {
+        super.configure(config, comparator, listener, useCatalogBeforeSchema);
+        this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
+        this.schemaRecords = getRegisteredHistoryRecord(instanceName);
+
+        // register the schema changes into state
+        // every change should be visible to the source function
+        registerHistory(instanceName, schemaRecords);
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+        removeHistory(instanceName);
+    }
+
+    @Override
+    protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
+        this.schemaRecords.add(new SchemaRecord(record));
+    }
+
+    @Override
+    protected void recoverRecords(Consumer<HistoryRecord> records) {
+        this.schemaRecords.stream().map(SchemaRecord::getHistoryRecord).forEach(records);
+    }
+
+    @Override
+    public boolean exists() {
+        return !schemaRecords.isEmpty();
+    }
+
+    @Override
+    public boolean storageExists() {
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "Flink Database History";
+    }
+
+    /**
+     * Determine whether the {@link FlinkDatabaseHistory} is compatible with the specified state.
+     */
+    public static boolean isCompatible(Collection<SchemaRecord> records) {
+        for (SchemaRecord record : records) {
+            // check the source/position/ddl is not null
+            if (!record.isHistoryRecord()) {
+                return false;
+            } else {
+                break;
+            }
+        }
+        return true;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
new file mode 100644
index 000000000..1027b18ed
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.removeHistory;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+import static io.debezium.relational.history.TableChanges.TableChange;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import io.debezium.config.Configuration;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.relational.ddl.DdlParser;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.schema.DatabaseSchema;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * The {@link FlinkDatabaseSchemaHistory} only stores the latest schema of the monitored tables.
+ * When recovering from the checkpoint, it should apply all the tables to the {@link
+ * DatabaseSchema}, which doesn't need to replay the history anymore.
+ *
+ * <p>Considering the data structure maintained in the {@link FlinkDatabaseSchemaHistory} is much
+ * different from the {@link FlinkDatabaseHistory}, it's not compatible with the {@link
+ * FlinkDatabaseHistory}. Because it only maintains the latest schema of the table rather than all
+ * history DDLs, it's useful to prevent OOM when meet massive history DDLs.
+ */
+public class FlinkDatabaseSchemaHistory implements DatabaseHistory {
+
+    public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";
+
+    private final FlinkJsonTableChangeSerializer tableChangesSerializer =
+            new FlinkJsonTableChangeSerializer();
+
+    private ConcurrentMap<TableId, SchemaRecord> latestTables;
+    private String instanceName;
+    private DatabaseHistoryListener listener;
+    private boolean storeOnlyMonitoredTablesDdl;
+    private boolean skipUnparseableDDL;
+    private boolean useCatalogBeforeSchema;
+
+    @Override
+    public void configure(
+            Configuration config,
+            HistoryRecordComparator comparator,
+            DatabaseHistoryListener listener,
+            boolean useCatalogBeforeSchema) {
+        this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
+        this.listener = listener;
+        this.storeOnlyMonitoredTablesDdl = config.getBoolean(STORE_ONLY_MONITORED_TABLES_DDL);
+        this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
+        this.useCatalogBeforeSchema = useCatalogBeforeSchema;
+
+        // recover
+        this.latestTables = new ConcurrentHashMap<>();
+        for (SchemaRecord schemaRecord : retrieveHistory(instanceName)) {
+            // validate here
+            TableChange tableChange =
+                    FlinkJsonTableChangeSerializer.fromDocument(
+                            schemaRecord.toDocument(), useCatalogBeforeSchema);
+            latestTables.put(tableChange.getId(), schemaRecord);
+        }
+        // register
+        registerHistory(instanceName, latestTables.values());
+    }
+
+    @Override
+    public void start() {
+        listener.started();
+    }
+
+    @Override
+    public void record(
+            Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl)
+            throws DatabaseHistoryException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "The %s cannot work with 'debezium.internal.implementation' = 'legacy',"
+                                + "please use %s",
+                        FlinkDatabaseSchemaHistory.class.getCanonicalName(),
+                        FlinkDatabaseHistory.class.getCanonicalName()));
+    }
+
+    @Override
+    public void record(
+            Map<String, ?> source,
+            Map<String, ?> position,
+            String databaseName,
+            String schemaName,
+            String ddl,
+            TableChanges changes)
+            throws DatabaseHistoryException {
+        for (TableChanges.TableChange change : changes) {
+            switch (change.getType()) {
+                case CREATE:
+                case ALTER:
+                    latestTables.put(
+                            change.getId(),
+                            new SchemaRecord(tableChangesSerializer.toDocument(change)));
+                    break;
+                case DROP:
+                    latestTables.remove(change.getId());
+                    break;
+                default:
+                    // impossible
+                    throw new RuntimeException(
+                            String.format("Unknown change type: %s.", change.getType()));
+            }
+        }
+        listener.onChangeApplied(
+                new HistoryRecord(source, position, databaseName, schemaName, ddl, changes));
+    }
+
+    @Override
+    public void recover(
+            Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
+        listener.recoveryStarted();
+        for (SchemaRecord record : latestTables.values()) {
+            TableChange tableChange =
+                    FlinkJsonTableChangeSerializer.fromDocument(
+                            record.getTableChangeDoc(), useCatalogBeforeSchema);
+            schema.overwriteTable(tableChange.getTable());
+        }
+        listener.recoveryStopped();
+    }
+
+    @Override
+    public void stop() {
+        if (instanceName != null) {
+            removeHistory(instanceName);
+        }
+        listener.stopped();
+    }
+
+    @Override
+    public boolean exists() {
+        return latestTables != null && !latestTables.isEmpty();
+    }
+
+    @Override
+    public boolean storageExists() {
+        return true;
+    }
+
+    @Override
+    public void initializeStorage() {
+        // do nothing
+    }
+
+    @Override
+    public boolean storeOnlyMonitoredTables() {
+        return storeOnlyMonitoredTablesDdl;
+    }
+
+    @Override
+    public boolean skipUnparseableDdlStatements() {
+        return skipUnparseableDDL;
+    }
+
+    /**
+     * Determine whether the {@link FlinkDatabaseSchemaHistory} is compatible with the specified
+     * state.
+     */
+    public static boolean isCompatible(Collection<SchemaRecord> records) {
+        for (SchemaRecord record : records) {
+            if (!record.isTableChangeRecord()) {
+                return false;
+            } else {
+                break;
+            }
+        }
+        return true;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java
new file mode 100644
index 000000000..d36516c29
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
+import io.debezium.embedded.EmbeddedEngine;
+import io.debezium.engine.DebeziumEngine;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A implementation of {@link OffsetBackingStore} backed on Flink's state mechanism.
+ *
+ * <p>The {@link #OFFSET_STATE_VALUE} in the {@link WorkerConfig} is the raw position and offset
+ * data in JSON format. It is set into the config when recovery from failover by {@link
+ * DebeziumSourceFunction} before startup the {@link DebeziumEngine}. If it is not a restoration,
+ * the {@link #OFFSET_STATE_VALUE} is empty. {@link DebeziumEngine} relies on the {@link
+ * OffsetBackingStore} for failover recovery.
+ *
+ * @see DebeziumSourceFunction
+ */
+public class FlinkOffsetBackingStore implements OffsetBackingStore {
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkOffsetBackingStore.class);
+
+    public static final String OFFSET_STATE_VALUE = "offset.storage.flink.state.value";
+    public static final int FLUSH_TIMEOUT_SECONDS = 10;
+
+    protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
+    protected ExecutorService executor;
+
+    @Override
+    public void configure(WorkerConfig config) {
+        // eagerly initialize the executor, because OffsetStorageWriter will use it later
+        start();
+
+        Map<String, ?> conf = config.originals();
+        if (!conf.containsKey(OFFSET_STATE_VALUE)) {
+            // a normal startup from clean state, not need to initialize the offset
+            return;
+        }
+
+        String stateJson = (String) conf.get(OFFSET_STATE_VALUE);
+        DebeziumOffsetSerializer serializer = new DebeziumOffsetSerializer();
+        DebeziumOffset debeziumOffset;
+        try {
+            debeziumOffset = serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8));
+        } catch (IOException e) {
+            LOG.error("Can't deserialize debezium offset state from JSON: " + stateJson, e);
+            throw new RuntimeException(e);
+        }
+
+        final String engineName = (String) conf.get(EmbeddedEngine.ENGINE_NAME.name());
+        Converter keyConverter = new JsonConverter();
+        Converter valueConverter = new JsonConverter();
+        keyConverter.configure(config.originals(), true);
+        Map<String, Object> valueConfigs = new HashMap<>(conf);
+        valueConfigs.put("schemas.enable", false);
+        valueConverter.configure(valueConfigs, true);
+        OffsetStorageWriter offsetWriter =
+                new OffsetStorageWriter(
+                        this,
+                        // must use engineName as namespace to align with Debezium Engine
+                        // implementation
+                        engineName,
+                        keyConverter,
+                        valueConverter);
+
+        offsetWriter.offset(debeziumOffset.sourcePartition, debeziumOffset.sourceOffset);
+
+        // flush immediately
+        if (!offsetWriter.beginFlush()) {
+            // if nothing is needed to be flushed, there must be something wrong with the
+            // initialization
+            LOG.warn(
+                    "Initialize FlinkOffsetBackingStore from empty offset state, this shouldn't happen.");
+            return;
+        }
+
+        // trigger flushing
+        Future<Void> flushFuture =
+                offsetWriter.doFlush(
+                        (error, result) -> {
+                            if (error != null) {
+                                LOG.error("Failed to flush initial offset.", error);
+                            } else {
+                                LOG.debug("Successfully flush initial offset.");
+                            }
+                        });
+
+        // wait until flushing finished
+        try {
+            flushFuture.get(FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            LOG.info(
+                    "Flush offsets successfully, partition: {}, offsets: {}",
+                    debeziumOffset.sourcePartition,
+                    debeziumOffset.sourceOffset);
+        } catch (InterruptedException e) {
+            LOG.warn("Flush offsets interrupted, cancelling.", e);
+            offsetWriter.cancelFlush();
+        } catch (ExecutionException e) {
+            LOG.error("Flush offsets threw an unexpected exception.", e);
+            offsetWriter.cancelFlush();
+        } catch (TimeoutException e) {
+            LOG.error("Timed out waiting to flush offsets to storage.", e);
+            offsetWriter.cancelFlush();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (executor == null) {
+            executor =
+                    Executors.newFixedThreadPool(
+                            1,
+                            ThreadUtils.createThreadFactory(
+                                    this.getClass().getSimpleName() + "-%d", false));
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (executor != null) {
+            executor.shutdown();
+            // Best effort wait for any get() and set() tasks (and caller's callbacks) to complete.
+            try {
+                executor.awaitTermination(30, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            if (!executor.shutdownNow().isEmpty()) {
+                throw new ConnectException(
+                        "Failed to stop FlinkOffsetBackingStore. Exiting without cleanly "
+                                + "shutting down pending tasks and/or callbacks.");
+            }
+            executor = null;
+        }
+    }
+
+    @Override
+    public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
+        return executor.submit(
+                () -> {
+                    Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
+                    for (ByteBuffer key : keys) {
+                        result.put(key, data.get(key));
+                    }
+                    return result;
+                });
+    }
+
+    @Override
+    public Future<Void> set(
+            final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
+        return executor.submit(
+                () -> {
+                    for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
+                        data.put(entry.getKey(), entry.getValue());
+                    }
+                    if (callback != null) {
+                        callback.onCompletion(null, null);
+                    }
+                    return null;
+                });
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java
new file mode 100644
index 000000000..9af2ff246
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import io.debezium.engine.ChangeEvent;
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Handover is a utility to hand over data (a buffer of records) and exception from a
+ * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a "size one
+ * blocking queue", with some extras around exception reporting, closing, and waking up thread
+ * without {@link Thread#interrupt() interrupting} threads.
+ *
+ * <p>This class is used in the Flink Debezium Engine Consumer to hand over data and exceptions
+ * between the thread that runs the DebeziumEngine class and the main thread.
+ *
+ * <p>The Handover can also be "closed", signalling from one thread to the other that it the thread
+ * has terminated.
+ */
+@ThreadSafe
+@Internal
+public class Handover implements Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private List<ChangeEvent<SourceRecord, SourceRecord>> next;
+
+    @GuardedBy("lock")
+    private Throwable error;
+
+    private boolean wakeupProducer;
+
+    /**
+     * Polls the next element from the Handover, possibly blocking until the next element is
+     * available. This method behaves similar to polling from a blocking queue.
+     *
+     * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then that
+     * exception is thrown rather than an element being returned.
+     *
+     * @return The next element (buffer of records, never null).
+     * @throws ClosedException Thrown if the Handover was {@link #close() closed}.
+     * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
+     */
+    public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception {
+        synchronized (lock) {
+            while (next == null && error == null) {
+                lock.wait();
+            }
+            List<ChangeEvent<SourceRecord, SourceRecord>> n = next;
+            if (n != null) {
+                next = null;
+                lock.notifyAll();
+                return n;
+            } else {
+                ExceptionUtils.rethrowException(error, error.getMessage());
+
+                // this statement cannot be reached since the above method always throws an
+                // exception this is only here to silence the compiler and any warnings
+                return Collections.emptyList();
+            }
+        }
+    }
+
+    /**
+     * Hands over an element from the producer. If the Handover already has an element that was not
+     * yet picked up by the consumer thread, this call blocks until the consumer picks up that
+     * previous element.
+     *
+     * <p>This behavior is similar to a "size one" blocking queue.
+     *
+     * @param element The next element to hand over.
+     * @throws InterruptedException Thrown, if the thread is interrupted while blocking for the
+     *     Handover to be empty.
+     */
+    public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element)
+            throws InterruptedException {
+
+        checkNotNull(element);
+
+        synchronized (lock) {
+            while (next != null && !wakeupProducer) {
+                lock.wait();
+            }
+
+            wakeupProducer = false;
+
+            // an error marks this as closed for the producer
+            if (error != null) {
+                ExceptionUtils.rethrow(error, error.getMessage());
+            } else {
+                // if there is no error, then this is open and can accept this element
+                next = element;
+                lock.notifyAll();
+            }
+        }
+    }
+
+    /**
+     * Reports an exception. The consumer will throw the given exception immediately, if it is
+     * currently blocked in the {@link #pollNext()} method, or the next time it calls that method.
+     *
+     * <p>After this method has been called, no call to either {@link #produce( List)} or {@link
+     * #pollNext()} will ever return regularly any more, but will always return exceptionally.
+     *
+     * <p>If another exception was already reported, this method does nothing.
+     *
+     * <p>For the producer, the Handover will appear as if it was {@link #close() closed}.
+     *
+     * @param t The exception to report.
+     */
+    public void reportError(Throwable t) {
+        checkNotNull(t);
+
+        synchronized (lock) {
+            LOG.error("Reporting error:", t);
+            // do not override the initial exception
+            if (error == null) {
+                error = t;
+            }
+            next = null;
+            lock.notifyAll();
+        }
+    }
+
+    /**
+     * Return whether there is an error.
+     *
+     * @return whether there is an error
+     */
+    public boolean hasError() {
+        return error != null;
+    }
+
+    /**
+     * Closes the handover. Both the {@link #produce(List)} method and the {@link #pollNext()} will
+     * throw a {@link ClosedException} on any currently blocking and future invocations.
+     *
+     * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method,
+     * that exception will not be overridden. The consumer thread will throw that exception upon
+     * calling {@link #pollNext()}, rather than the {@code ClosedException}.
+     */
+    @Override
+    public void close() {
+        synchronized (lock) {
+            next = null;
+            wakeupProducer = false;
+
+            if (error == null) {
+                error = new ClosedException();
+            }
+            lock.notifyAll();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * An exception thrown by the Handover in the {@link #pollNext()} or {@link #produce(List)}
+     * method, after the Handover was closed via {@link #close()}.
+     */
+    public static final class ClosedException extends Exception {
+
+        private static final long serialVersionUID = 1L;
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java
new file mode 100644
index 000000000..bcc48ce52
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import io.debezium.document.Document;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+/**
+ * The Record represents a schema change event, it contains either one {@link HistoryRecord} or
+ * {@link TableChange}.
+ *
+ * <p>The {@link HistoryRecord} will be used by {@link FlinkDatabaseHistory} which keeps full
+ * history of table change events for all tables, the {@link TableChange} will be used by {@link
+ * FlinkDatabaseSchemaHistory} which keeps the latest table change for each table.
+ */
+public class SchemaRecord {
+
+    @Nullable private final HistoryRecord historyRecord;
+
+    @Nullable private final Document tableChangeDoc;
+
+    public SchemaRecord(HistoryRecord historyRecord) {
+        this.historyRecord = historyRecord;
+        this.tableChangeDoc = null;
+    }
+
+    public SchemaRecord(Document document) {
+        if (isHistoryRecordDocument(document)) {
+            this.historyRecord = new HistoryRecord(document);
+            this.tableChangeDoc = null;
+        } else {
+            this.tableChangeDoc = document;
+            this.historyRecord = null;
+        }
+    }
+
+    @Nullable
+    public HistoryRecord getHistoryRecord() {
+        return historyRecord;
+    }
+
+    @Nullable
+    public Document getTableChangeDoc() {
+        return tableChangeDoc;
+    }
+
+    public boolean isHistoryRecord() {
+        return historyRecord != null;
+    }
+
+    public boolean isTableChangeRecord() {
+        return tableChangeDoc != null;
+    }
+
+    public Document toDocument() {
+        if (historyRecord != null) {
+            return historyRecord.document();
+        } else {
+            return tableChangeDoc;
+        }
+    }
+
+    @Override
+    public String toString() {
+        try {
+            return DocumentWriter.defaultWriter().write(toDocument());
+        } catch (IOException e) {
+            return super.toString();
+        }
+    }
+
+    private boolean isHistoryRecordDocument(Document document) {
+        return new HistoryRecord(document).isValid();
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java
new file mode 100644
index 000000000..50430463c
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.io.Serializable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** Emits a row with physical fields and metadata fields. */
+@Internal
+public final class AppendMetadataCollector implements Collector<RowData>, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final MetadataConverter[] metadataConverters;
+
+    public transient SourceRecord inputRecord;
+    public transient Collector<RowData> outputCollector;
+
+    public AppendMetadataCollector(MetadataConverter[] metadataConverters) {
+        this.metadataConverters = metadataConverters;
+    }
+
+    public void collect(RowData physicalRow, TableChange tableSchema) {
+        GenericRowData metaRow = new GenericRowData(metadataConverters.length);
+        for (int i = 0; i < metadataConverters.length; i++) {
+            Object meta = metadataConverters[i].read(inputRecord, tableSchema);
+            metaRow.setField(i, meta);
+        }
+        RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
+        outputCollector.collect(outRow);
+    }
+
+    @Override
+    public void collect(RowData record) {
+
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DebeziumOptions.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DebeziumOptions.java
new file mode 100644
index 000000000..c40cfea3e
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DebeziumOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import java.util.Map;
+import java.util.Properties;
+
+/** Option utils for Debezium options. */
+public class DebeziumOptions {
+    public static final String DEBEZIUM_OPTIONS_PREFIX = "debezium.";
+
+    public static Properties getDebeziumProperties(Map<String, String> properties) {
+        final Properties debeziumProperties = new Properties();
+
+        if (hasDebeziumProperties(properties)) {
+            properties.keySet().stream()
+                    .filter(key -> key.startsWith(DEBEZIUM_OPTIONS_PREFIX))
+                    .forEach(
+                            key -> {
+                                final String value = properties.get(key);
+                                final String subKey =
+                                        key.substring((DEBEZIUM_OPTIONS_PREFIX).length());
+                                debeziumProperties.put(subKey, value);
+                            });
+        }
+        return debeziumProperties;
+    }
+
+    /**
+     * Decides if the table options contains Debezium client properties that start with prefix
+     * 'debezium'.
+     */
+    private static boolean hasDebeziumProperties(Map<String, String> debeziumOptions) {
+        return debeziumOptions.keySet().stream()
+                .anyMatch(k -> k.startsWith(DEBEZIUM_OPTIONS_PREFIX));
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverter.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverter.java
new file mode 100644
index 000000000..0b4d47029
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import java.io.Serializable;
+import org.apache.kafka.connect.data.Schema;
+
+/**
+ * Runtime converter that converts objects of Debezium into objects of Flink Table & SQL internal
+ * data structures.
+ */
+@FunctionalInterface
+public interface DeserializationRuntimeConverter extends Serializable {
+    Object convert(Object dbzObj, Schema schema) throws Exception;
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverterFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverterFactory.java
new file mode 100644
index 000000000..3d0861062
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+import java.util.Optional;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/**
+ * Factory to create {@link DeserializationRuntimeConverter} according to {@link LogicalType}. It's
+ * usually used to create a user-defined {@link DeserializationRuntimeConverter} which has a higher
+ * resolve order than default converter.
+ */
+public interface DeserializationRuntimeConverterFactory extends Serializable {
+
+    /** A user-defined converter factory which always fallback to default converters. */
+    DeserializationRuntimeConverterFactory DEFAULT =
+            (logicalType, serverTimeZone) -> Optional.empty();
+
+    /**
+     * Returns an optional {@link DeserializationRuntimeConverter}. Returns {@link Optional#empty()}
+     * if fallback to default converter.
+     *
+     * @param logicalType the Flink Table & SQL internal datatype to be converted from objects of
+     *     Debezium
+     * @param serverTimeZone TimeZone used to convert data with timestamp type
+     */
+    Optional<DeserializationRuntimeConverter> createUserDefinedConverter(
+            LogicalType logicalType, ZoneId serverTimeZone);
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/MetadataConverter.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/MetadataConverter.java
new file mode 100644
index 000000000..8c3470841
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/MetadataConverter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import io.debezium.relational.history.TableChanges;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** A converter converts {@link SourceRecord} metadata into Flink internal data structures. */
+@FunctionalInterface
+@Internal
+public interface MetadataConverter extends Serializable {
+    Object read(SourceRecord record);
+
+    default Object read(SourceRecord record, @Nullable TableChanges.TableChange tableSchema) {
+        return read(record);
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
new file mode 100644
index 000000000..b5cea2aef
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.TemporalConversions;
+import io.debezium.data.Envelope;
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.data.VariableScaleDecimal;
+import io.debezium.relational.history.TableChanges.TableChange;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTime;
+import io.debezium.time.NanoTimestamp;
+import io.debezium.time.Timestamp;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Optional;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link
+ * RowData}.
+ */
+public final class RowDataDebeziumDeserializeSchema
+        implements DebeziumDeserializationSchema<RowData> {
+    private static final long serialVersionUID = 2L;
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
+        deserialize(record, out, null);
+    }
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<RowData> out, TableChange tableChange)
+        throws Exception {
+        Envelope.Operation op = Envelope.operationFor(record);
+        Struct value = (Struct) record.value();
+        Schema valueSchema = record.valueSchema();
+        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+            GenericRowData insert = extractAfterRow(value, valueSchema);
+            validator.validate(insert, RowKind.INSERT);
+            insert.setRowKind(RowKind.INSERT);
+            emit(record, insert, out, tableChange);
+        } else if (op == Envelope.Operation.DELETE) {
+            GenericRowData delete = extractBeforeRow(value, valueSchema);
+            validator.validate(delete, RowKind.DELETE);
+            delete.setRowKind(RowKind.DELETE);
+            emit(record, delete, out, tableChange);
+        } else {
+            GenericRowData before = extractBeforeRow(value, valueSchema);
+            validator.validate(before, RowKind.UPDATE_BEFORE);
+            before.setRowKind(RowKind.UPDATE_BEFORE);
+            emit(record, before, out, tableChange);
+
+            GenericRowData after = extractAfterRow(value, valueSchema);
+            validator.validate(after, RowKind.UPDATE_AFTER);
+            after.setRowKind(RowKind.UPDATE_AFTER);
+            emit(record, after, out, tableChange);
+        }
+    }
+
+    /** Custom validator to validate the row value. */
+    public interface ValueValidator extends Serializable {
+        void validate(RowData rowData, RowKind rowKind) throws Exception;
+    }
+
+    /** TypeInformation of the produced {@link RowData}. * */
+    private final TypeInformation<RowData> resultTypeInfo;
+
+    /**
+     * Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of
+     * physical column values.
+     */
+    private final DeserializationRuntimeConverter physicalConverter;
+
+    /** Whether the deserializer needs to handle metadata columns. */
+    private final boolean hasMetadata;
+
+    /**
+     * A wrapped output collector which is used to append metadata columns after physical columns.
+     */
+    private final AppendMetadataCollector appendMetadataCollector;
+
+    /** Validator to validate the row value. */
+    private final ValueValidator validator;
+
+    /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    RowDataDebeziumDeserializeSchema(
+            RowType physicalDataType,
+            MetadataConverter[] metadataConverters,
+            TypeInformation<RowData> resultTypeInfo,
+            ValueValidator validator,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+        this.hasMetadata = checkNotNull(metadataConverters).length > 0;
+        this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
+        this.physicalConverter =
+                createConverter(
+                        checkNotNull(physicalDataType),
+                        serverTimeZone,
+                        userDefinedConverterFactory);
+        this.resultTypeInfo = checkNotNull(resultTypeInfo);
+        this.validator = checkNotNull(validator);
+    }
+
+    private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception {
+        Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
+        Struct after = value.getStruct(Envelope.FieldName.AFTER);
+        return (GenericRowData) physicalConverter.convert(after, afterSchema);
+    }
+
+    private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws Exception {
+        Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
+        Struct before = value.getStruct(Envelope.FieldName.BEFORE);
+        return (GenericRowData) physicalConverter.convert(before, beforeSchema);
+    }
+
+    private void emit(SourceRecord inRecord, RowData physicalRow, Collector<RowData> collector,
+        TableChange tableChange) {
+        if (!hasMetadata) {
+            collector.collect(physicalRow);
+            return;
+        }
+
+        appendMetadataCollector.inputRecord = inRecord;
+        appendMetadataCollector.outputCollector = collector;
+        appendMetadataCollector.collect(physicalRow, tableChange);
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return resultTypeInfo;
+    }
+
+    // -------------------------------------------------------------------------------------
+    // Builder
+    // -------------------------------------------------------------------------------------
+
+    /** Builder of {@link RowDataDebeziumDeserializeSchema}. */
+    public static class Builder {
+        private RowType physicalRowType;
+        private TypeInformation<RowData> resultTypeInfo;
+        private MetadataConverter[] metadataConverters = new MetadataConverter[0];
+        private ValueValidator validator = (rowData, rowKind) -> {};
+        private ZoneId serverTimeZone = ZoneId.of("UTC");
+        private DeserializationRuntimeConverterFactory userDefinedConverterFactory =
+                DeserializationRuntimeConverterFactory.DEFAULT;
+
+        public Builder setPhysicalRowType(RowType physicalRowType) {
+            this.physicalRowType = physicalRowType;
+            return this;
+        }
+
+        public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
+            this.metadataConverters = metadataConverters;
+            return this;
+        }
+
+        public Builder setResultTypeInfo(TypeInformation<RowData> resultTypeInfo) {
+            this.resultTypeInfo = resultTypeInfo;
+            return this;
+        }
+
+        public Builder setValueValidator(ValueValidator validator) {
+            this.validator = validator;
+            return this;
+        }
+
+        public Builder setServerTimeZone(ZoneId serverTimeZone) {
+            this.serverTimeZone = serverTimeZone;
+            return this;
+        }
+
+        public Builder setUserDefinedConverterFactory(
+                DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+            this.userDefinedConverterFactory = userDefinedConverterFactory;
+            return this;
+        }
+
+        public RowDataDebeziumDeserializeSchema build() {
+            return new RowDataDebeziumDeserializeSchema(
+                    physicalRowType,
+                    metadataConverters,
+                    resultTypeInfo,
+                    validator,
+                    serverTimeZone,
+                    userDefinedConverterFactory);
+        }
+    }
+
+    // -------------------------------------------------------------------------------------
+    // Runtime Converters
+    // -------------------------------------------------------------------------------------
+
+    /** Creates a runtime converter which is null safe. */
+    private static DeserializationRuntimeConverter createConverter(
+            LogicalType type,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+        return wrapIntoNullableConverter(
+                createNotNullConverter(type, serverTimeZone, userDefinedConverterFactory));
+    }
+
+    // --------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260).
+    // --------------------------------------------------------------------------------
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    public static DeserializationRuntimeConverter createNotNullConverter(
+            LogicalType type,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+        // user defined converter has a higher resolve order
+        Optional<DeserializationRuntimeConverter> converter =
+                userDefinedConverterFactory.createUserDefinedConverter(type, serverTimeZone);
+        if (converter.isPresent()) {
+            return converter.get();
+        }
+
+        // if no matched user defined converter, fallback to the default converter
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return new DeserializationRuntimeConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) {
+                        return null;
+                    }
+                };
+            case BOOLEAN:
+                return convertToBoolean();
+            case TINYINT:
+                return new DeserializationRuntimeConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) {
+                        return Byte.parseByte(dbzObj.toString());
+                    }
+                };
+            case SMALLINT:
+                return new DeserializationRuntimeConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) {
+                        return Short.parseShort(dbzObj.toString());
+                    }
+                };
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return convertToInt();
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return convertToLong();
+            case DATE:
+                return convertToDate();
+            case TIME_WITHOUT_TIME_ZONE:
+                return convertToTime();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return convertToTimestamp(serverTimeZone);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return convertToLocalTimeZoneTimestamp(serverTimeZone);
+            case FLOAT:
+                return convertToFloat();
+            case DOUBLE:
+                return convertToDouble();
+            case CHAR:
+            case VARCHAR:
+                return convertToString();
+            case BINARY:
+            case VARBINARY:
+                return convertToBinary();
+            case DECIMAL:
+                return createDecimalConverter((DecimalType) type);
+            case ROW:
+                return createRowConverter(
+                        (RowType) type, serverTimeZone, userDefinedConverterFactory);
+            case ARRAY:
+            case MAP:
+            case MULTISET:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+    }
+
+    private static DeserializationRuntimeConverter convertToBoolean() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Boolean) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Byte) {
+                    return (byte) dbzObj == 1;
+                } else if (dbzObj instanceof Short) {
+                    return (short) dbzObj == 1;
+                } else {
+                    return Boolean.parseBoolean(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToInt() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Integer) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Long) {
+                    return ((Long) dbzObj).intValue();
+                } else {
+                    return Integer.parseInt(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToLong() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Integer) {
+                    return ((Integer) dbzObj).longValue();
+                } else if (dbzObj instanceof Long) {
+                    return dbzObj;
+                } else {
+                    return Long.parseLong(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToDouble() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Float) {
+                    return ((Float) dbzObj).doubleValue();
+                } else if (dbzObj instanceof Double) {
+                    return dbzObj;
+                } else {
+                    return Double.parseDouble(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToFloat() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Float) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Double) {
+                    return ((Double) dbzObj).floatValue();
+                } else {
+                    return Float.parseFloat(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToDate() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToTime() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Long) {
+                    switch (schema.name()) {
+                        case MicroTime.SCHEMA_NAME:
+                            return (int) ((long) dbzObj / 1000);
+                        case NanoTime.SCHEMA_NAME:
+                            return (int) ((long) dbzObj / 1000_000);
+                    }
+                } else if (dbzObj instanceof Integer) {
+                    return dbzObj;
+                }
+                // get number of milliseconds of the day
+                return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Long) {
+                    switch (schema.name()) {
+                        case Timestamp.SCHEMA_NAME:
+                            return TimestampData.fromEpochMillis((Long) dbzObj);
+                        case MicroTimestamp.SCHEMA_NAME:
+                            long micro = (long) dbzObj;
+                            return TimestampData.fromEpochMillis(
+                                    micro / 1000, (int) (micro % 1000 * 1000));
+                        case NanoTimestamp.SCHEMA_NAME:
+                            long nano = (long) dbzObj;
+                            return TimestampData.fromEpochMillis(
+                                    nano / 1000_000, (int) (nano % 1000_000));
+                    }
+                }
+                LocalDateTime localDateTime =
+                        TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
+                return TimestampData.fromLocalDateTime(localDateTime);
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
+            ZoneId serverTimeZone) {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof String) {
+                    String str = (String) dbzObj;
+                    // TIMESTAMP_LTZ type is encoded in string type
+                    Instant instant = Instant.parse(str);
+                    return TimestampData.fromLocalDateTime(
+                            LocalDateTime.ofInstant(instant, serverTimeZone));
+                }
+                throw new IllegalArgumentException(
+                        "Unable to convert to TimestampData from unexpected value '"
+                                + dbzObj
+                                + "' of type "
+                                + dbzObj.getClass().getName());
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToString() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                return StringData.fromString(dbzObj.toString());
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToBinary() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof byte[]) {
+                    return dbzObj;
+                } else if (dbzObj instanceof ByteBuffer) {
+                    ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
+                    byte[] bytes = new byte[byteBuffer.remaining()];
+                    byteBuffer.get(bytes);
+                    return bytes;
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
+        final int precision = decimalType.getPrecision();
+        final int scale = decimalType.getScale();
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                BigDecimal bigDecimal;
+                if (dbzObj instanceof byte[]) {
+                    // decimal.handling.mode=precise
+                    bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
+                } else if (dbzObj instanceof String) {
+                    // decimal.handling.mode=string
+                    bigDecimal = new BigDecimal((String) dbzObj);
+                } else if (dbzObj instanceof Double) {
+                    // decimal.handling.mode=double
+                    bigDecimal = BigDecimal.valueOf((Double) dbzObj);
+                } else {
+                    if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
+                        SpecialValueDecimal decimal =
+                                VariableScaleDecimal.toLogical((Struct) dbzObj);
+                        bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
+                    } else {
+                        // fallback to string
+                        bigDecimal = new BigDecimal(dbzObj.toString());
+                    }
+                }
+                return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter createRowConverter(
+            RowType rowType,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+        final DeserializationRuntimeConverter[] fieldConverters =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(
+                                logicType ->
+                                        createConverter(
+                                                logicType,
+                                                serverTimeZone,
+                                                userDefinedConverterFactory))
+                        .toArray(DeserializationRuntimeConverter[]::new);
+        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws Exception {
+                Struct struct = (Struct) dbzObj;
+                int arity = fieldNames.length;
+                GenericRowData row = new GenericRowData(arity);
+                for (int i = 0; i < arity; i++) {
+                    String fieldName = fieldNames[i];
+                    Field field = schema.field(fieldName);
+                    if (field == null) {
+                        row.setField(i, null);
+                    } else {
+                        Object fieldValue = struct.getWithoutDefault(fieldName);
+                        Schema fieldSchema = schema.field(fieldName).schema();
+                        Object convertedField =
+                                convertField(fieldConverters[i], fieldValue, fieldSchema);
+                        row.setField(i, convertedField);
+                    }
+                }
+                return row;
+            }
+        };
+    }
+
+    private static Object convertField(
+            DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
+            throws Exception {
+        if (fieldValue == null) {
+            return null;
+        } else {
+            return fieldConverter.convert(fieldValue, fieldSchema);
+        }
+    }
+
+    private static DeserializationRuntimeConverter wrapIntoNullableConverter(
+            DeserializationRuntimeConverter converter) {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws Exception {
+                if (dbzObj == null) {
+                    return null;
+                }
+                return converter.convert(dbzObj, schema);
+            }
+        };
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java
new file mode 100644
index 000000000..b00d12024
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.SchemaRecord;
+import io.debezium.relational.history.DatabaseHistory;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Util to safely visit schema history between {@link DatabaseHistory} and {@link
+ * DebeziumSourceFunction}.
+ */
+public class DatabaseHistoryUtil {
+
+    private DatabaseHistoryUtil() {
+        // do nothing
+    }
+
+    /**
+     * Structure to maintain the current schema history. The content in {@link SchemaRecord} is up
+     * to the implementation of the {@link DatabaseHistory}.
+     */
+    private static final Map<String, Collection<SchemaRecord>> HISTORY = new HashMap<>();
+
+    /**
+     * The schema history will be clean up once {@link DatabaseHistory#stop()}, the checkpoint
+     * should fail when this happens.
+     */
+    private static final Map<String, Boolean> HISTORY_CLEANUP_STATUS = new HashMap<>();
+
+    /** Registers history of schema safely. */
+    public static void registerHistory(String engineName, Collection<SchemaRecord> engineHistory) {
+        synchronized (HISTORY) {
+            HISTORY.put(engineName, engineHistory);
+            HISTORY_CLEANUP_STATUS.put(engineName, false);
+        }
+    }
+
+    /** Remove history of schema safely. */
+    public static void removeHistory(String engineName) {
+        synchronized (HISTORY) {
+            HISTORY_CLEANUP_STATUS.put(engineName, true);
+            HISTORY.remove(engineName);
+        }
+    }
+
+    /**
+     * Retrieves history of schema safely, this method firstly checks the history status of specific
+     * engine, and then return the history of schema if the history exists(didn't clean up yet).
+     * Returns null when the history of schema has been clean up.
+     */
+    public static Collection<SchemaRecord> retrieveHistory(String engineName) {
+        synchronized (HISTORY) {
+            if (Boolean.TRUE.equals(HISTORY_CLEANUP_STATUS.get(engineName))) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Retrieve schema history failed, the schema records for engine %s has been removed,"
+                                        + " this might because the debezium engine "
+                                    + "has been shutdown due to other errors.",
+                                engineName));
+            } else {
+                return HISTORY.getOrDefault(engineName, Collections.emptyList());
+            }
+        }
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/TemporalConversions.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/TemporalConversions.java
new file mode 100644
index 000000000..24f55b259
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/TemporalConversions.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.concurrent.TimeUnit;
+
+/** Temporal conversion constants. */
+public final class TemporalConversions {
+
+    static final long MILLISECONDS_PER_SECOND = TimeUnit.SECONDS.toMillis(1);
+    static final long MICROSECONDS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
+    static final long MICROSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toMicros(1);
+    static final long NANOSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
+    static final long NANOSECONDS_PER_MICROSECOND = TimeUnit.MICROSECONDS.toNanos(1);
+    static final long NANOSECONDS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+    static final long NANOSECONDS_PER_DAY = TimeUnit.DAYS.toNanos(1);
+    static final long SECONDS_PER_DAY = TimeUnit.DAYS.toSeconds(1);
+    static final long MICROSECONDS_PER_DAY = TimeUnit.DAYS.toMicros(1);
+    static final LocalDate EPOCH = LocalDate.ofEpochDay(0);
+
+    private TemporalConversions() {
+    }
+
+    public static LocalDate toLocalDate(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        if (obj instanceof LocalDate) {
+            return (LocalDate) obj;
+        }
+        if (obj instanceof LocalDateTime) {
+            return ((LocalDateTime) obj).toLocalDate();
+        }
+        if (obj instanceof java.sql.Date) {
+            return ((java.sql.Date) obj).toLocalDate();
+        }
+        if (obj instanceof java.sql.Time) {
+            throw new IllegalArgumentException(
+                    "Unable to convert to LocalDate from a java.sql.Time value '" + obj + "'");
+        }
+        if (obj instanceof java.util.Date) {
+            java.util.Date date = (java.util.Date) obj;
+            return LocalDate.of(date.getYear() + 1900, date.getMonth() + 1, date.getDate());
+        }
+        if (obj instanceof Long) {
+            // Assume the value is the epoch day number
+            return LocalDate.ofEpochDay((Long) obj);
+        }
+        if (obj instanceof Integer) {
+            // Assume the value is the epoch day number
+            return LocalDate.ofEpochDay((Integer) obj);
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to LocalDate from unexpected value '"
+                        + obj
+                        + "' of type "
+                        + obj.getClass().getName());
+    }
+
+    public static LocalTime toLocalTime(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        if (obj instanceof LocalTime) {
+            return (LocalTime) obj;
+        }
+        if (obj instanceof LocalDateTime) {
+            return ((LocalDateTime) obj).toLocalTime();
+        }
+        if (obj instanceof java.sql.Date) {
+            throw new IllegalArgumentException(
+                    "Unable to convert to LocalDate from a java.sql.Date value '" + obj + "'");
+        }
+        if (obj instanceof java.sql.Time) {
+            java.sql.Time time = (java.sql.Time) obj;
+            long millis = (int) (time.getTime() % MILLISECONDS_PER_SECOND);
+            int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+            return LocalTime.of(
+                    time.getHours(), time.getMinutes(), time.getSeconds(), nanosOfSecond);
+        }
+        if (obj instanceof java.sql.Timestamp) {
+            java.sql.Timestamp timestamp = (java.sql.Timestamp) obj;
+            return LocalTime.of(
+                    timestamp.getHours(),
+                    timestamp.getMinutes(),
+                    timestamp.getSeconds(),
+                    timestamp.getNanos());
+        }
+        if (obj instanceof java.util.Date) {
+            java.util.Date date = (java.util.Date) obj;
+            long millis = (int) (date.getTime() % MILLISECONDS_PER_SECOND);
+            int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+            return LocalTime.of(
+                    date.getHours(), date.getMinutes(), date.getSeconds(), nanosOfSecond);
+        }
+        if (obj instanceof Duration) {
+            Long value = ((Duration) obj).toNanos();
+            if (value >= 0 && value <= NANOSECONDS_PER_DAY) {
+                return LocalTime.ofNanoOfDay(value);
+            } else {
+                throw new IllegalArgumentException(
+                        "Time values must use number of milliseconds greater than 0 and less than 86400000000000");
+            }
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to LocalTime from unexpected value '"
+                        + obj
+                        + "' of type "
+                        + obj.getClass().getName());
+    }
+
+    public static LocalDateTime toLocalDateTime(Object obj, ZoneId serverTimeZone) {
+        if (obj == null) {
+            return null;
+        }
+        if (obj instanceof OffsetDateTime) {
+            return ((OffsetDateTime) obj).toLocalDateTime();
+        }
+        if (obj instanceof Instant) {
+            return ((Instant) obj).atOffset(ZoneOffset.UTC).toLocalDateTime();
+        }
+        if (obj instanceof LocalDateTime) {
+            return (LocalDateTime) obj;
+        }
+        if (obj instanceof LocalDate) {
+            LocalDate date = (LocalDate) obj;
+            return LocalDateTime.of(date, LocalTime.MIDNIGHT);
+        }
+        if (obj instanceof LocalTime) {
+            LocalTime time = (LocalTime) obj;
+            return LocalDateTime.of(EPOCH, time);
+        }
+        if (obj instanceof java.sql.Date) {
+            java.sql.Date sqlDate = (java.sql.Date) obj;
+            LocalDate date = sqlDate.toLocalDate();
+            return LocalDateTime.of(date, LocalTime.MIDNIGHT);
+        }
+        if (obj instanceof java.sql.Time) {
+            LocalTime localTime = toLocalTime(obj);
+            return LocalDateTime.of(EPOCH, localTime);
+        }
+        if (obj instanceof java.sql.Timestamp) {
+            java.sql.Timestamp timestamp = (java.sql.Timestamp) obj;
+            return LocalDateTime.of(
+                    timestamp.getYear() + 1900,
+                    timestamp.getMonth() + 1,
+                    timestamp.getDate(),
+                    timestamp.getHours(),
+                    timestamp.getMinutes(),
+                    timestamp.getSeconds(),
+                    timestamp.getNanos());
+        }
+        if (obj instanceof java.util.Date) {
+            java.util.Date date = (java.util.Date) obj;
+            long millis = (int) (date.getTime() % MILLISECONDS_PER_SECOND);
+            if (millis < 0) {
+                millis = MILLISECONDS_PER_SECOND + millis;
+            }
+            int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+            return LocalDateTime.of(
+                    date.getYear() + 1900,
+                    date.getMonth() + 1,
+                    date.getDate(),
+                    date.getHours(),
+                    date.getMinutes(),
+                    date.getSeconds(),
+                    nanosOfSecond);
+        }
+        if (obj instanceof String) {
+            String str = (String) obj;
+            // TIMESTAMP type is encoded in string type
+            Instant instant = Instant.parse(str);
+            return LocalDateTime.ofInstant(instant, serverTimeZone);
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to LocalDateTime from unexpected value '"
+                        + obj
+                        + "' of type "
+                        + obj.getClass().getName());
+    }
+
+    public static long toEpochMicros(Instant instant) {
+        return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
+                + TimeUnit.NANOSECONDS.toMicros(instant.getNano());
+    }
+
+    public static Instant toInstantFromMicros(long microsSinceEpoch) {
+        return Instant.ofEpochSecond(
+                TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch),
+                TimeUnit.MICROSECONDS.toNanos(microsSinceEpoch % TimeUnit.SECONDS.toMicros(1)));
+    }
+}
diff --git a/pom.xml b/pom.xml
index 9280ea395..dc8af3bae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,6 +172,9 @@
         <opencsv.version>5.4</opencsv.version>
         <javax.servlet.api.version>4.0.1</javax.servlet.api.version>
 
+        <flink-table-planner-blink_2.11.version>1.13.5</flink-table-planner-blink_2.11.version>
+        <flink-connector-mysql-cdc.version>2.0.2</flink-connector-mysql-cdc.version>
+
         <gson.version>2.8.6</gson.version>
         <jackson.version>2.13.2</jackson.version>
         <jackson.databind.version>2.13.2.2</jackson.databind.version>
@@ -224,6 +227,8 @@
         <jcommander.version>1.78</jcommander.version>
         <je.version>7.3.7</je.version>
         <tencentcloud.cls.version>1.0.5</tencentcloud.cls.version>
+        <esri-geometry-api.version>2.0.0</esri-geometry-api.version>
+        <HikariCP.version>4.0.3</HikariCP.version>
     </properties>
 
     <dependencyManagement>
@@ -239,6 +244,16 @@
                 <artifactId>flume-ng-node</artifactId>
                 <version>${flume.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.ververica</groupId>
+                <artifactId>flink-connector-mysql-cdc</artifactId>
+                <version>${flink-connector-mysql-cdc.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-table-planner-blink_2.11</artifactId>
+                <version>${flink-table-planner-blink_2.11.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.flume</groupId>
                 <artifactId>flume-ng-sdk</artifactId>
@@ -249,6 +264,17 @@
                 <artifactId>flume-ng-configuration</artifactId>
                 <version>${flume.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.zaxxer</groupId>
+                <artifactId>HikariCP</artifactId>
+                <version>${HikariCP.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
             <dependency>
                 <groupId>org.apache.flume.flume-ng-sinks</groupId>
                 <artifactId>flume-hdfs-sink</artifactId>
@@ -1274,7 +1300,11 @@
                 <artifactId>jcommander</artifactId>
                 <version>${jcommander.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>com.esri.geometry</groupId>
+                <artifactId>esri-geometry-api</artifactId>
+                <version>${esri-geometry-api.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.tencentcloudapi.cls</groupId>
                 <artifactId>tencentcloud-cls-sdk-java</artifactId>