You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/15 02:46:42 UTC

[inlong] branch master updated: [INLONG-5156][Sort] Add metric for SQLServer source with Flink metrics group (#5329)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4885bfd37 [INLONG-5156][Sort] Add metric for SQLServer source with Flink metrics group (#5329)
4885bfd37 is described below

commit 4885bfd373da02894b6a68d6132ed5705214ed8d
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Mon Aug 15 10:46:37 2022 +0800

    [INLONG-5156][Sort] Add metric for SQLServer source with Flink metrics group (#5329)
---
 inlong-sort/sort-connectors/sqlserver-cdc/pom.xml  |  11 +
 .../inlong/sort/cdc/sqlserver/SqlServerSource.java | 174 ++++++
 .../sqlserver/table/DebeziumSourceFunction.java    | 658 +++++++++++++++++++++
 .../cdc/sqlserver/table/SqlServerTableFactory.java | 196 ++++++
 .../cdc/sqlserver/table/SqlServerTableSource.java  | 257 ++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 licenses/inlong-sort-connectors/LICENSE            |   7 +
 7 files changed, 1319 insertions(+)

diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/pom.xml b/inlong-sort/sort-connectors/sqlserver-cdc/pom.xml
index 1127b9f45..4073341e8 100644
--- a/inlong-sort/sort-connectors/sqlserver-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/pom.xml
@@ -36,6 +36,16 @@
             <groupId>com.ververica</groupId>
             <artifactId>flink-connector-sqlserver-cdc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -53,6 +63,7 @@
                         <configuration>
                             <artifactSet>
                                 <includes>
+                                    <include>org.apache.inlong:*</include>
                                     <include>io.debezium:debezium-api</include>
                                     <include>io.debezium:debezium-embedded</include>
                                     <include>io.debezium:debezium-core</include>
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/SqlServerSource.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/SqlServerSource.java
new file mode 100644
index 000000000..3dbdd769b
--- /dev/null
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/SqlServerSource.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.sqlserver;
+
+import com.ververica.cdc.connectors.sqlserver.SqlServerValidator;
+import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
+import org.apache.inlong.sort.cdc.sqlserver.table.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import io.debezium.connector.sqlserver.SqlServerConnector;
+
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to consume transaction
+ * log for SqlServer.
+ */
+public class SqlServerSource {
+
+    private static final String DATABASE_SERVER_NAME = "sqlserver_transaction_log_source";
+
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    /** Builder class of {@link SqlServerSource}. */
+    public static class Builder<T> {
+
+        private int port = 1433; // default 1433 port
+        private String hostname;
+        private String database;
+        private String username;
+        private String password;
+        private String[] tableList;
+        private Properties dbzProperties;
+        private StartupOptions startupOptions = StartupOptions.initial();
+        private DebeziumDeserializationSchema<T> deserializer;
+        private String inlongMetric;
+        private String auditHostAndPorts;
+
+        public Builder<T> hostname(String hostname) {
+            this.hostname = hostname;
+            return this;
+        }
+
+        /** Integer port number of the SQL Server database server. */
+        public Builder<T> port(int port) {
+            this.port = port;
+            return this;
+        }
+
+        /** The name of the SQL Server database from which to stream the changes. */
+        public Builder<T> database(String database) {
+            this.database = database;
+            return this;
+        }
+
+        /**
+         * An optional comma-separated list of regular expressions that match fully-qualified table
+         * identifiers for tables that you want Debezium to capture; any table that is not included
+         * in table.include.list is excluded from capture. Each identifier is of the form
+         * schemaName.tableName. By default, the connector captures all non-system tables for the
+         * designated schemas. Must not be used with table.exclude.list.
+         */
+        public Builder<T> tableList(String... tableList) {
+            this.tableList = tableList;
+            return this;
+        }
+
+        /** Username to use when connecting to the SQL Server database server. */
+        public Builder<T> username(String username) {
+            this.username = username;
+            return this;
+        }
+
+        /** Password to use when connecting to the SQL Server database server. */
+        public Builder<T> password(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /** The Debezium SqlServer connector properties. For example, "snapshot.mode". */
+        public Builder<T> debeziumProperties(Properties properties) {
+            this.dbzProperties = properties;
+            return this;
+        }
+
+        /**
+         * The deserializer used to convert from consumed {@link
+         * org.apache.kafka.connect.source.SourceRecord}.
+         */
+        public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
+            this.deserializer = deserializer;
+            return this;
+        }
+
+        public Builder<T> inlongMetric(String inlongMetric) {
+            this.inlongMetric = inlongMetric;
+            return this;
+        }
+
+        public Builder<T> auditHostAndPorts(String auditHostAndPorts) {
+            this.auditHostAndPorts = auditHostAndPorts;
+            return this;
+        }
+
+        /** Specifies the startup options. */
+        public Builder<T> startupOptions(StartupOptions startupOptions) {
+            this.startupOptions = startupOptions;
+            return this;
+        }
+
+        public DebeziumSourceFunction<T> build() {
+            Properties props = new Properties();
+            props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
+            // hard code server name, because we don't need to distinguish it, docs:
+            // Logical name that identifies and provides a namespace for the SQL Server database
+            // server that you want Debezium to capture. The logical name should be unique across
+            // all other connectors, since it is used as a prefix for all Kafka topic names
+            // emanating from this connector. Only alphanumeric characters and underscores should be
+            // used.
+            props.setProperty("database.server.name", DATABASE_SERVER_NAME);
+            props.setProperty("database.hostname", checkNotNull(hostname));
+            props.setProperty("database.user", checkNotNull(username));
+            props.setProperty("database.password", checkNotNull(password));
+            props.setProperty("database.port", String.valueOf(port));
+            props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
+            props.setProperty("database.dbname", checkNotNull(database));
+
+            if (tableList != null) {
+                props.setProperty("table.include.list", String.join(",", tableList));
+            }
+
+            switch (startupOptions.startupMode) {
+                case INITIAL:
+                    props.setProperty("snapshot.mode", "initial");
+                    break;
+                case INITIAL_ONLY:
+                    props.setProperty("snapshot.mode", "initial_only");
+                    break;
+                case LATEST_OFFSET:
+                    props.setProperty("snapshot.mode", "schema_only");
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+
+            if (dbzProperties != null) {
+                props.putAll(dbzProperties);
+            }
+
+            return new DebeziumSourceFunction<>(
+                    deserializer, props, null, new SqlServerValidator(props),
+                inlongMetric, auditHostAndPorts);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
new file mode 100644
index 000000000..c1cea9dba
--- /dev/null
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.sqlserver.table;
+
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.Validator;
+import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.ververica.cdc.debezium.internal.Handover;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
+ * from databases into Flink.
+ *
+ * <p>Please refer to Debezium's documentation for the available configuration properties:
+ * https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties
+ */
+@PublicEvolving
+public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
+        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
+
+    private static final long serialVersionUID = -5808108641062931623L;
+
+    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
+    /**
+     * State name of the consumer's partition offset states.
+     */
+    public static final String OFFSETS_STATE_NAME = "offset-states";
+
+    /**
+     * State name of the consumer's history records state.
+     */
+    public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
+    /**
+     * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
+     */
+    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+    /**
+     * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
+     * not.
+     */
+    public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
+    /**
+     * The configuration value represents legacy implementation.
+     */
+    public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+
+    // ---------------------------------------------------------------------------------------
+    // Properties
+    // ---------------------------------------------------------------------------------------
+
+    /**
+     * The schema to convert from Debezium's messages into Flink's objects.
+     */
+    private final DebeziumDeserializationSchema<T> deserializer;
+
+    /**
+     * User-supplied properties for Kafka. *
+     */
+    private final Properties properties;
+
+    /**
+     * The specific binlog offset to read from when the first startup.
+     */
+    private final @Nullable
+    DebeziumOffset specificOffset;
+
+    /**
+     * Data for pending but uncommitted offsets.
+     */
+    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+    /**
+     * Flag indicating whether the Debezium Engine is started.
+     */
+    private volatile boolean debeziumStarted = false;
+
+    /**
+     * Validator to validate the connected database satisfies the cdc connector's requirements.
+     */
+    private final Validator validator;
+
+    // ---------------------------------------------------------------------------------------
+    // State
+    // ---------------------------------------------------------------------------------------
+
+    /**
+     * The offsets to restore to, if the consumer restores state from a checkpoint.
+     *
+     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+     * method.
+     *
+     * <p>Using a String because we are encoding the offset state in JSON bytes.
+     */
+    private transient volatile String restoredOffsetState;
+
+    /**
+     * Accessor for state in the operator state backend.
+     */
+    private transient ListState<byte[]> offsetState;
+
+    /**
+     * State to store the history records, i.e. schema changes.
+     *
+     * @see FlinkDatabaseHistory
+     * @see FlinkDatabaseSchemaHistory
+     */
+    private transient ListState<String> schemaRecordsState;
+
+    // ---------------------------------------------------------------------------------------
+    // Worker
+    // ---------------------------------------------------------------------------------------
+
+    private transient ExecutorService executor;
+    private transient DebeziumEngine<?> engine;
+    /**
+     * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
+     * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
+     */
+    private transient String engineInstanceName;
+
+    /**
+     * Consume the events from the engine and commit the offset to the engine.
+     */
+    private transient DebeziumChangeConsumer changeConsumer;
+
+    /**
+     * The consumer to fetch records from {@link Handover}.
+     */
+    private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
+
+    /**
+     * Buffer the events from the source and record the errors from the debezium.
+     */
+    private transient Handover handover;
+
+    private String inlongMetric;
+
+    private SourceMetricData metricData;
+
+    private String inLongGroupId;
+
+    private String auditHostAndPorts;
+
+    private String inLongStreamId;
+
+    private transient AuditImp auditImp;
+
+    // ---------------------------------------------------------------------------------------
+
+    public DebeziumSourceFunction(
+            DebeziumDeserializationSchema<T> deserializer,
+            Properties properties,
+            @Nullable DebeziumOffset specificOffset,
+            Validator validator, String inlongMetric,
+        String auditHostAndPorts) {
+        this.deserializer = deserializer;
+        this.properties = properties;
+        this.specificOffset = specificOffset;
+        this.validator = validator;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        validator.validate();
+        super.open(parameters);
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
+        this.executor = Executors.newSingleThreadExecutor(threadFactory);
+        this.handover = new Handover();
+        this.changeConsumer = new DebeziumChangeConsumer(handover);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint and restore
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+        this.offsetState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                OFFSETS_STATE_NAME,
+                                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+        this.schemaRecordsState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
+
+        if (context.isRestored()) {
+            restoreOffsetState();
+            restoreHistoryRecordsState();
+        } else {
+            if (specificOffset != null) {
+                byte[] serializedOffset =
+                        DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+                LOG.info(
+                        "Consumer subtask {} starts to read from specified offset {}.",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        restoredOffsetState);
+            } else {
+                LOG.info(
+                        "Consumer subtask {} has no restore state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+    }
+
+    private void restoreOffsetState() throws Exception {
+        for (byte[] serializedOffset : offsetState.get()) {
+            if (restoredOffsetState == null) {
+                restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+            } else {
+                throw new RuntimeException(
+                        "Debezium Source only support single task, "
+                                + "however, this is restored from multiple tasks.");
+            }
+        }
+        LOG.info(
+                "Consumer subtask {} restored offset state: {}.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                restoredOffsetState);
+    }
+
+    private void restoreHistoryRecordsState() throws Exception {
+        DocumentReader reader = DocumentReader.defaultReader();
+        ConcurrentLinkedQueue<SchemaRecord> historyRecords = new ConcurrentLinkedQueue<>();
+        int recordsCount = 0;
+        boolean firstEntry = true;
+        for (String record : schemaRecordsState.get()) {
+            if (firstEntry) {
+                // we store the engine instance name in the first element
+                this.engineInstanceName = record;
+                firstEntry = false;
+            } else {
+                // Put the records into the state. The database history should read, reorganize and
+                // register the state.
+                historyRecords.add(new SchemaRecord(reader.read(record)));
+                recordsCount++;
+            }
+        }
+        if (engineInstanceName != null) {
+            registerHistory(engineInstanceName, historyRecords);
+        }
+        LOG.info(
+                "Consumer subtask {} restored history records state: {} with {} records.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                engineInstanceName,
+                recordsCount);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        if (handover.hasError()) {
+            LOG.debug("snapshotState() called on closed source");
+            throw new FlinkRuntimeException(
+                    "Call snapshotState() on closed source, checkpoint failed.");
+        } else {
+            snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+            snapshotHistoryRecordsState();
+        }
+    }
+
+    private void snapshotOffsetState(long checkpointId) throws Exception {
+        offsetState.clear();
+
+        final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
+
+        byte[] serializedOffset = null;
+        if (fetcher == null) {
+            // the fetcher has not yet been initialized, which means we need to return the
+            // originally restored offsets
+            if (restoredOffsetState != null) {
+                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            }
+        } else {
+            byte[] currentState = fetcher.snapshotCurrentState();
+            if (currentState == null && restoredOffsetState != null) {
+                // the fetcher has been initialized, but has not yet received any data,
+                // which means we need to return the originally restored offsets.
+                serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            } else {
+                serializedOffset = currentState;
+            }
+        }
+
+        if (serializedOffset != null) {
+            offsetState.add(serializedOffset);
+            // the map cannot be asynchronously updated, because only one checkpoint call
+            // can happen on this function at a time: either snapshotState() or
+            // notifyCheckpointComplete()
+            pendingOffsetsToCommit.put(checkpointId, serializedOffset);
+            // truncate the map of pending offsets to commit, to prevent infinite growth
+            while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+                pendingOffsetsToCommit.remove(0);
+            }
+        }
+    }
+
+    private void snapshotHistoryRecordsState() throws Exception {
+        schemaRecordsState.clear();
+
+        if (engineInstanceName != null) {
+            schemaRecordsState.add(engineInstanceName);
+            Collection<SchemaRecord> records = retrieveHistory(engineInstanceName);
+            DocumentWriter writer = DocumentWriter.defaultWriter();
+            for (SchemaRecord record : records) {
+                schemaRecordsState.add(writer.write(record.toDocument()));
+            }
+        }
+    }
+
+    @Override
+    public void run(SourceContext<T> sourceContext) throws Exception {
+
+        // initialize metrics
+        // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
+        final Method getMetricGroupMethod =
+                getRuntimeContext().getClass().getMethod("getMetricGroup");
+        getMetricGroupMethod.setAccessible(true);
+        final MetricGroup metricGroup =
+                (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+        metricGroup.gauge(
+                "currentFetchEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+        metricGroup.gauge(
+                "currentEmitEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+        metricGroup.gauge(
+                "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
+        if (StringUtils.isNotEmpty(this.inlongMetric)) {
+            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+            inLongGroupId = inlongMetricArray[0];
+            inLongStreamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            metricData = new SourceMetricData(inLongGroupId, inLongStreamId, nodeId, metricGroup);
+            metricData.registerMetricsForNumRecordsIn();
+            metricData.registerMetricsForNumBytesIn();
+            metricData.registerMetricsForNumBytesInPerSecond();
+            metricData.registerMetricsForNumRecordsInPerSecond();
+        }
+        if (auditHostAndPorts != null) {
+            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+            auditImp = AuditImp.getInstance();
+        }
+        properties.setProperty("name", "engine");
+        properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
+        if (restoredOffsetState != null) {
+            // restored from state
+            properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
+        }
+        // DO NOT include schema change, e.g. DDL
+        properties.setProperty("include.schema.changes", "false");
+        // disable the offset flush totally
+        properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
+        // disable tombstones
+        properties.setProperty("tombstones.on.delete", "false");
+        if (engineInstanceName == null) {
+            // not restore from recovery
+            engineInstanceName = UUID.randomUUID().toString();
+        }
+        // history instance name to initialize FlinkDatabaseHistory
+        properties.setProperty(
+                FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
+        // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't
+        // continue to read binlog
+        // see
+        // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
+        // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
+        properties.setProperty("database.history", determineDatabase().getCanonicalName());
+
+        // we have to filter out the heartbeat events, otherwise the deserializer will fail
+        String dbzHeartbeatPrefix =
+                properties.getProperty(
+                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
+                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
+        this.debeziumChangeFetcher =
+                new DebeziumChangeFetcher<>(
+                        sourceContext,
+                        new DebeziumDeserializationSchema<T>() {
+                            @Override
+                            public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
+                                outputMetrics(record);
+                                deserializer.deserialize(record, out);
+                            }
+
+                            @Override
+                            public TypeInformation<T> getProducedType() {
+                                return deserializer.getProducedType();
+                            }
+                        },
+                        restoredOffsetState == null, // DB snapshot phase if restore state is null
+                        dbzHeartbeatPrefix,
+                        handover);
+
+        // create the engine with this configuration ...
+        this.engine =
+                DebeziumEngine.create(Connect.class)
+                        .using(properties)
+                        .notifying(changeConsumer)
+                        .using(OffsetCommitPolicy.always())
+                        .using(
+                                (success, message, error) -> {
+                                    if (success) {
+                                        // Close the handover and prepare to exit.
+                                        handover.close();
+                                    } else {
+                                        handover.reportError(error);
+                                    }
+                                })
+                        .build();
+
+        // run the engine asynchronously
+        executor.execute(engine);
+        debeziumStarted = true;
+
+        // start the real debezium consumer
+        debeziumChangeFetcher.runFetchLoop();
+    }
+
+    private void outputMetrics(SourceRecord record) {
+        if (metricData != null) {
+            metricData.getNumRecordsIn().inc(1L);
+            metricData.getNumBytesIn()
+                    .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+        }
+        if (auditImp != null) {
+            auditImp.add(
+                Constants.AUDIT_SORT_INPUT,
+                inLongGroupId,
+                inLongStreamId,
+                System.currentTimeMillis(),
+                1,
+                record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        if (!debeziumStarted) {
+            LOG.debug("notifyCheckpointComplete() called when engine is not started.");
+            return;
+        }
+
+        final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher;
+        if (fetcher == null) {
+            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+            return;
+        }
+
+        try {
+            final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+            if (posInMap == -1) {
+                LOG.warn(
+                        "Consumer subtask {} received confirmation for unknown checkpoint id {}",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        checkpointId);
+                return;
+            }
+
+            byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap);
+
+            // remove older checkpoints in map
+            for (int i = 0; i < posInMap; i++) {
+                pendingOffsetsToCommit.remove(0);
+            }
+
+            if (serializedOffsets == null || serializedOffsets.length == 0) {
+                LOG.debug(
+                        "Consumer subtask {} has empty checkpoint state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+                return;
+            }
+
+            DebeziumOffset offset =
+                    DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
+            changeConsumer.commitOffset(offset);
+        } catch (Exception e) {
+            // ignore exception if we are no longer running
+            LOG.warn("Ignore error when committing offset to database.", e);
+        }
+    }
+
+    @Override
+    public void cancel() {
+        // safely and gracefully stop the engine
+        shutdownEngine();
+        if (debeziumChangeFetcher != null) {
+            debeziumChangeFetcher.close();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        cancel();
+
+        if (executor != null) {
+            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        }
+
+        super.close();
+    }
+
+    /**
+     * Safely and gracefully stop the Debezium engine.
+     */
+    private void shutdownEngine() {
+        try {
+            if (engine != null) {
+                engine.close();
+            }
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        } finally {
+            if (executor != null) {
+                executor.shutdownNow();
+            }
+
+            debeziumStarted = false;
+
+            if (handover != null) {
+                handover.close();
+            }
+        }
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializer.getProducedType();
+    }
+
+    @VisibleForTesting
+    public LinkedMap getPendingOffsetsToCommit() {
+        return pendingOffsetsToCommit;
+    }
+
+    @VisibleForTesting
+    public boolean getDebeziumStarted() {
+        return debeziumStarted;
+    }
+
+    private Class<?> determineDatabase() {
+        boolean isCompatibleWithLegacy =
+                FlinkDatabaseHistory.isCompatible(retrieveHistory(engineInstanceName));
+        if (LEGACY_IMPLEMENTATION_VALUE.equals(properties.get(LEGACY_IMPLEMENTATION_KEY))) {
+            // specifies the legacy implementation but the state may be incompatible
+            if (isCompatibleWithLegacy) {
+                return FlinkDatabaseHistory.class;
+            } else {
+                throw new IllegalStateException(
+                        "The configured option 'debezium.internal.implementation' is 'legacy', but the state of "
+                                + "source is incompatible with this implementation, you should remove the the option.");
+            }
+        } else if (FlinkDatabaseSchemaHistory.isCompatible(retrieveHistory(engineInstanceName))) {
+            // tries the non-legacy first
+            return FlinkDatabaseSchemaHistory.class;
+        } else if (isCompatibleWithLegacy) {
+            // fallback to legacy if possible
+            return FlinkDatabaseHistory.class;
+        } else {
+            // impossible
+            throw new IllegalStateException("Can't determine which DatabaseHistory to use.");
+        }
+    }
+
+    @VisibleForTesting
+    public String getEngineInstanceName() {
+        return engineInstanceName;
+    }
+
+    public SourceMetricData getMetricData() {
+        return metricData;
+    }
+}
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
new file mode 100644
index 000000000..e6a7e5be6
--- /dev/null
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.sqlserver.table;
+
+import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
+import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/** Factory for creating configured instance of {@link SqlServerTableSource}. */
+public class SqlServerTableFactory implements DynamicTableSourceFactory {
+
+    private static final String IDENTIFIER = "sqlserver-cdc-inlong";
+
+    private static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the SqlServer database server.");
+
+    private static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(1433)
+                    .withDescription("Integer port number of the SqlServer database server.");
+
+    private static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the SqlServer database to use when connecting to the SqlServer database server.");
+
+    private static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the SqlServer database server.");
+
+    private static final ConfigOption<String> DATABASE_NAME =
+            ConfigOptions.key("database-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Database name of the SqlServer server to monitor.");
+
+    private static final ConfigOption<String> SCHEMA_NAME =
+            ConfigOptions.key("schema-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Schema name of the SqlServer database to monitor.");
+
+    private static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Table name of the SqlServer database to monitor.");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .defaultValue("UTC")
+                    .withDescription("The session time zone in database server.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for SqlServer CDC consumer, valid enumerations are "
+                                    + "\"initial\", \"initial-only\", \"latest-offset\"");
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
+
+        final ReadableConfig config = helper.getOptions();
+        String hostname = config.get(HOSTNAME);
+        String username = config.get(USERNAME);
+        String password = config.get(PASSWORD);
+        String schemaName = config.get(SCHEMA_NAME);
+        String databaseName = config.get(DATABASE_NAME);
+        String tableName = config.get(TABLE_NAME);
+        String inlongMetric = config.get(INLONG_METRIC);
+        String auditHostAndPorts = config.get(INLONG_AUDIT);
+        ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
+        int port = config.get(PORT);
+        StartupOptions startupOptions = getStartupOptions(config);
+        ResolvedSchema physicalSchema =
+                getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
+
+        return new SqlServerTableSource(
+                physicalSchema,
+                port,
+                hostname,
+                databaseName,
+                schemaName,
+                tableName,
+                serverTimeZone,
+                username,
+                password,
+                getDebeziumProperties(context.getCatalogTable().getOptions()),
+                startupOptions,
+            inlongMetric, auditHostAndPorts);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(HOSTNAME);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        options.add(DATABASE_NAME);
+        options.add(SCHEMA_NAME);
+        options.add(TABLE_NAME);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PORT);
+        options.add(SERVER_TIME_ZONE);
+        options.add(SCAN_STARTUP_MODE);
+        options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
+        return options;
+    }
+
+    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY = "initial-only";
+    private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+
+    private static StartupOptions getStartupOptions(ReadableConfig config) {
+        String modeString = config.get(SCAN_STARTUP_MODE);
+
+        switch (modeString.toLowerCase()) {
+            case SCAN_STARTUP_MODE_VALUE_INITIAL:
+                return StartupOptions.initial();
+
+            case SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY:
+                return StartupOptions.initialOnly();
+
+            case SCAN_STARTUP_MODE_VALUE_LATEST:
+                return StartupOptions.latest();
+
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Invalid value for option '%s'. Supported values are [%s, %s, %s], but was: %s",
+                                SCAN_STARTUP_MODE.key(),
+                                SCAN_STARTUP_MODE_VALUE_INITIAL,
+                                SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY,
+                                SCAN_STARTUP_MODE_VALUE_LATEST,
+                                modeString));
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableSource.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableSource.java
new file mode 100644
index 000000000..71dc6c784
--- /dev/null
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableSource.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.sqlserver.table;
+
+import com.ververica.cdc.connectors.sqlserver.table.SqlServerDeserializationConverterFactory;
+import com.ververica.cdc.connectors.sqlserver.table.SqlServerReadableMetadata;
+import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.inlong.sort.cdc.sqlserver.SqlServerSource;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a SqlServer source from a logical
+ * description.
+ */
+public class SqlServerTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+    private final ResolvedSchema physicalSchema;
+    private final int port;
+    private final String hostname;
+    private final String database;
+    private final String schemaName;
+    private final String tableName;
+    private final ZoneId serverTimeZone;
+    private final String username;
+    private final String password;
+    private final Properties dbzProperties;
+    private final StartupOptions startupOptions;
+
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    private String inlongMetric;
+
+    private String auditHostAndPorts;
+
+    public SqlServerTableSource(
+            ResolvedSchema physicalSchema,
+            int port,
+            String hostname,
+            String database,
+            String schemaName,
+            String tableName,
+            ZoneId serverTimeZone,
+            String username,
+            String password,
+            Properties dbzProperties,
+            StartupOptions startupOptions,
+            String inlongMetric,
+        String auditHostAndPorts) {
+        this.physicalSchema = physicalSchema;
+        this.port = port;
+        this.hostname = checkNotNull(hostname);
+        this.database = checkNotNull(database);
+        this.schemaName = checkNotNull(schemaName);
+        this.tableName = checkNotNull(tableName);
+        this.serverTimeZone = serverTimeZone;
+        this.username = checkNotNull(username);
+        this.password = checkNotNull(password);
+        this.dbzProperties = dbzProperties;
+        this.producedDataType = physicalSchema.toPhysicalRowDataType();
+        this.metadataKeys = Collections.emptyList();
+        this.startupOptions = startupOptions;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        RowType physicalDataType =
+                (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+        MetadataConverter[] metadataConverters = getMetadataConverters();
+        TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
+
+        DebeziumDeserializationSchema<RowData> deserializer =
+                RowDataDebeziumDeserializeSchema.newBuilder()
+                        .setPhysicalRowType(physicalDataType)
+                        .setMetadataConverters(metadataConverters)
+                        .setResultTypeInfo(typeInfo)
+                        .setServerTimeZone(serverTimeZone)
+                        .setUserDefinedConverterFactory(
+                                SqlServerDeserializationConverterFactory.instance())
+                        .build();
+        DebeziumSourceFunction<RowData> sourceFunction =
+                SqlServerSource.<RowData>builder()
+                        .hostname(hostname)
+                        .port(port)
+                        .database(database)
+                        .tableList(schemaName + "." + tableName)
+                        .username(username)
+                        .password(password)
+                        .debeziumProperties(dbzProperties)
+                        .startupOptions(startupOptions)
+                        .deserializer(deserializer)
+                    .inlongMetric(inlongMetric)
+                    .auditHostAndPorts(auditHostAndPorts)
+                        .build();
+        return SourceFunctionProvider.of(sourceFunction, false);
+    }
+
+    private MetadataConverter[] getMetadataConverters() {
+        if (metadataKeys.isEmpty()) {
+            return new MetadataConverter[0];
+        }
+
+        return metadataKeys.stream()
+                .map(
+                        key ->
+                                Stream.of(SqlServerReadableMetadata.values())
+                                        .filter(m -> m.getKey().equals(key))
+                                        .findFirst()
+                                        .orElseThrow(IllegalStateException::new))
+                .map(SqlServerReadableMetadata::getConverter)
+                .toArray(MetadataConverter[]::new);
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        SqlServerTableSource source =
+                new SqlServerTableSource(
+                        physicalSchema,
+                        port,
+                        hostname,
+                        database,
+                        schemaName,
+                        tableName,
+                        serverTimeZone,
+                        username,
+                        password,
+                        dbzProperties,
+                        startupOptions,
+                    inlongMetric,
+                    auditHostAndPorts);
+        source.metadataKeys = metadataKeys;
+        source.producedDataType = producedDataType;
+        return source;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SqlServerTableSource that = (SqlServerTableSource) o;
+        return port == that.port
+                && Objects.equals(physicalSchema, that.physicalSchema)
+                && Objects.equals(hostname, that.hostname)
+                && Objects.equals(database, that.database)
+                && Objects.equals(schemaName, that.schemaName)
+                && Objects.equals(tableName, that.tableName)
+                && Objects.equals(serverTimeZone, that.serverTimeZone)
+                && Objects.equals(username, that.username)
+                && Objects.equals(password, that.password)
+                && Objects.equals(dbzProperties, that.dbzProperties)
+                && Objects.equals(startupOptions, that.startupOptions)
+                && Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(metadataKeys, that.metadataKeys);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                physicalSchema,
+                port,
+                hostname,
+                database,
+                schemaName,
+                tableName,
+                serverTimeZone,
+                username,
+                password,
+                dbzProperties,
+                startupOptions,
+                producedDataType,
+                metadataKeys);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "SqlServer-CDC";
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Stream.of(SqlServerReadableMetadata.values())
+                .collect(
+                        Collectors.toMap(
+                                SqlServerReadableMetadata::getKey,
+                                SqlServerReadableMetadata::getDataType));
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        this.metadataKeys = metadataKeys;
+        this.producedDataType = producedDataType;
+    }
+}
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..156311bd2
--- /dev/null
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.cdc.sqlserver.table.SqlServerTableFactory
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index db56dcfee..77f3d5e07 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -510,6 +510,13 @@
   Source  : org.apache.flink:flink-connector-jdbc_2.11:1.13.5 (Please note that the software have been modified.)
   License : https://github.com/apache/flink/blob/master/LICENSE
 
+ 1.3.8 inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
+       inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
+       inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableSource.java
+       inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/SqlServerSource.java
+  Source  : flink-cdc-connectors 2.2.1 (Please note that the software have been modified.)
+  License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents: