You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/01 09:17:36 UTC

[inlong] branch master updated: [INLONG-5242][Sort] Add metric computing for mongoDB and Oracle (#5298)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9038879f7 [INLONG-5242][Sort] Add metric computing for mongoDB and Oracle (#5298)
9038879f7 is described below

commit 9038879f7eb2f7e7f5f4f16cfc397c9fadd304d1
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Mon Aug 1 17:17:32 2022 +0800

    [INLONG-5242][Sort] Add metric computing for mongoDB and Oracle (#5298)
---
 .../org/apache/inlong/sort/base/Constants.java     |   5 +
 .../inlong/sort/base/metric/SinkMetricData.java    |  90 ++++-
 .../inlong/sort/base/metric/SourceMetricData.java  |  55 ++-
 inlong-sort/sort-connectors/mongodb-cdc/pom.xml    |   6 +
 .../sort/cdc/mongodb}/DebeziumSourceFunction.java  |  55 +--
 .../inlong/sort/cdc/mongodb/MongoDBSource.java     | 447 +++++++++++++++++++++
 .../sort/cdc/mongodb/table/MongoDBTableSource.java | 312 ++++++++++++++
 .../mongodb/table/MongoDBTableSourceFactory.java   | 295 ++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  18 +
 .../inlong/sort/cdc/mysql/source/MySqlSource.java  |   2 +-
 .../mysql/source/config/MySqlSourceOptions.java    |   2 +-
 inlong-sort/sort-connectors/oracle-cdc/pom.xml     |   6 +
 .../sort/cdc/oracle}/DebeziumSourceFunction.java   |  55 +--
 .../inlong/sort/cdc/oracle/OracleSource.java       | 183 +++++++++
 .../sort/cdc/oracle/table/OracleTableSource.java   | 246 ++++++++++++
 .../oracle/table/OracleTableSourceFactory.java}    | 101 ++---
 .../org.apache.flink.table.factories.Factory       |  18 +
 inlong-sort/sort-connectors/postgres-cdc/pom.xml   |   1 +
 .../DebeziumSourceFunction.java                    |   3 +-
 .../cdc/postgres/table/PostgreSQLTableFactory.java |   2 +-
 licenses/inlong-sort-connectors/LICENSE            |  22 +-
 21 files changed, 1812 insertions(+), 112 deletions(-)

diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
index c9c749ad6..bda4e7475 100644
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -46,4 +46,9 @@ public final class Constants {
 
     public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond";
 
+    /**
+     * It is used for inlong.metric
+     */
+    public static final String DELIMITER = "&";
+
 }
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index c3af8f027..27c3a2194 100644
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
 
 /**
  * A collection class for handling metrics
@@ -45,16 +46,66 @@ public class SinkMetricData {
         this.metricGroup = metricGroup;
     }
 
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     *
+     * @param groupId inlong groupId
+     * @param streamId inlong streamId
+     * @param nodeId inlong nodeId
+     * @param metricName metric name
+     */
     public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName) {
+        registerMetricsForNumRecordsOut(groupId, streamId, nodeId, metricName, new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     *
+     * @param groupId inlong groupId
+     * @param streamId inlong streamId
+     * @param nodeId inlong nodeId
+     * @param metricName metric name
+     */
+    public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName,
+            Counter counter) {
         numRecordsOut =
                 metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName);
+                        .counter(metricName, counter);
     }
 
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     *
+     * @param groupId inlong groupId
+     * @param streamId inlong streamId
+     * @param nodeId inlong nodeId
+     * @param metricName metric name
+     */
     public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName) {
+        registerMetricsForNumBytesOut(groupId, streamId, nodeId, metricName, new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     *
+     * @param groupId inlong groupId
+     * @param streamId inlong streamId
+     * @param nodeId inlong nodeId
+     * @param metricName metric name
+     */
+    public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName,
+            Counter counter) {
         numBytesOut =
                 metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName);
+                        .counter(metricName, counter);
     }
 
     public void registerMetricsForNumRecordsOutPerSecond(String groupId, String streamId, String nodeId,
@@ -73,16 +124,45 @@ public class SinkMetricData {
 
     public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
             String metricName) {
+        registerMetricsForDirtyRecords(groupId, streamId, nodeId, metricName, new SimpleCounter());
+    }
+
+    public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
+            String metricName, Counter counter) {
         dirtyRecords = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                .counter(metricName);
+                .counter(metricName, counter);
     }
 
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     *
+     * @param groupId inlong groupId
+     * @param streamId inlong streamId
+     * @param nodeId inlong nodeId
+     * @param metricName metric name
+     */
     public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
             String metricName) {
+        registerMetricsForDirtyBytes(groupId, streamId, nodeId, metricName, new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     *
+     * @param groupId inlong groupId
+     * @param streamId inlong streamId
+     * @param nodeId inlong nodeId
+     * @param metricName metric name
+     */
+    public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
+            String metricName, Counter counter) {
         dirtyBytes =
                 metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName);
-
+                        .counter(metricName, counter);
     }
 
     public Counter getNumRecordsOut() {
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 8a065836f..a962e2938 100644
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
 
 /**
  * A collection class for handling metrics
@@ -42,16 +43,66 @@ public class SourceMetricData {
         this.metricGroup = metricGroup;
     }
 
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     *
+     * @param groupId inlong groupId
+     * @param streamId inlong streamId
+     * @param nodeId inlong nodeId
+     * @param metricName metric name
+     */
     public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName) {
+        registerMetricsForNumRecordsIn(groupId, streamId, nodeId, metricName, new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     *
+     * @param groupId inlong groupId
+     * @param streamId inlong streamId
+     * @param nodeId inlong nodeId
+     * @param metricName metric name
+     */
+    public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName,
+            Counter counter) {
         numRecordsIn =
                 metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName);
+                        .counter(metricName, counter);
     }
 
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     *
+     * @param groupId inlong groupId
+     * @param streamId inlong streamId
+     * @param nodeId inlong nodeId
+     * @param metricName metric name
+     */
     public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName) {
+        registerMetricsForNumBytesIn(groupId, streamId, nodeId, metricName, new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     *
+     * @param groupId inlong groupId
+     * @param streamId inlong streamId
+     * @param nodeId inlong nodeId
+     * @param metricName metric name
+     */
+    public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName,
+            Counter counter) {
         numBytesIn =
                 metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName);
+                        .counter(metricName, counter);
     }
 
     public void registerMetricsForNumRecordsInPerSecond(String groupId, String streamId, String nodeId,
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
index 5ffda58ab..f4d3e6036 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
@@ -36,6 +36,11 @@
             <groupId>com.ververica</groupId>
             <artifactId>flink-connector-mongodb-cdc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -53,6 +58,7 @@
                         <configuration>
                             <artifactSet>
                                 <includes>
+                                    <include>org.apache.inlong:*</include>
                                     <include>io.debezium:debezium-api</include>
                                     <include>io.debezium:debezium-embedded</include>
                                     <include>io.debezium:debezium-core</include>
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
similarity index 95%
copy from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
copy to inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 157cdb3e6..141cc5672 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.cdc.postgres;
+package org.apache.inlong.sort.cdc.mongodb;
 
 import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.Validator;
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -78,10 +79,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -115,33 +112,40 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
 public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
 
+    private static final long serialVersionUID = -5808108641062931623L;
+
+    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
     /**
      * State name of the consumer's partition offset states.
      */
     public static final String OFFSETS_STATE_NAME = "offset-states";
+
     /**
      * State name of the consumer's history records state.
      */
     public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
     /**
      * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
      */
     public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
     /**
      * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
      * not.
      */
     public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
     /**
      * The configuration value represents legacy implementation.
      */
     public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
-    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
-    private static final long serialVersionUID = -5808108641062931623L;
 
     // ---------------------------------------------------------------------------------------
     // Properties
     // ---------------------------------------------------------------------------------------
+
     /**
      * The schema to convert from Debezium's messages into Flink's objects.
      */
@@ -162,18 +166,21 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
      * Data for pending but uncommitted offsets.
      */
     private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-    /**
-     * Validator to validate the connected database satisfies the cdc connector's requirements.
-     */
-    private final Validator validator;
+
     /**
      * Flag indicating whether the Debezium Engine is started.
      */
     private volatile boolean debeziumStarted = false;
 
+    /**
+     * Validator to validate the connected database satisfies the cdc connector's requirements.
+     */
+    private final Validator validator;
+
     // ---------------------------------------------------------------------------------------
     // State
     // ---------------------------------------------------------------------------------------
+
     /**
      * The offsets to restore to, if the consumer restores state from a checkpoint.
      *
@@ -226,7 +233,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
 
     private String inlongMetric;
 
-    private SourceMetricData sourceMetricData;
+    private SourceMetricData metricData;
 
     // ---------------------------------------------------------------------------------------
 
@@ -414,17 +421,17 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
         if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split("_");
+            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            sourceMetricData = new SourceMetricData(metricGroup);
-            sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, NUM_RECORDS_IN);
-            sourceMetricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, NUM_BYTES_IN);
-            sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId, streamId,
-                    nodeId, NUM_BYTES_IN_PER_SECOND);
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
-                    nodeId, NUM_RECORDS_IN_PER_SECOND);
+            metricData = new SourceMetricData(metricGroup);
+            metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, Constants.NUM_RECORDS_IN);
+            metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, Constants.NUM_BYTES_IN);
+            metricData.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId,
+                    Constants.NUM_BYTES_IN_PER_SECOND);
+            metricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId,
+                    Constants.NUM_RECORDS_IN_PER_SECOND);
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -463,9 +470,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                         new DebeziumDeserializationSchema<T>() {
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
-                                if (sourceMetricData != null) {
-                                    sourceMetricData.getNumRecordsIn().inc(1L);
-                                    sourceMetricData.getNumBytesIn()
+                                if (metricData != null) {
+                                    metricData.getNumRecordsIn().inc(1L);
+                                    metricData.getNumBytesIn()
                                             .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
                                 }
                                 deserializer.deserialize(record, out);
@@ -639,6 +646,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
     }
 
     public SourceMetricData getMetricData() {
-        return sourceMetricData;
+        return metricData;
     }
 }
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
new file mode 100644
index 000000000..ebe9bf4b5
--- /dev/null
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mongodb;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.client.model.changestream.FullDocument;
+import com.mongodb.kafka.connect.source.MongoSourceConfig;
+import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
+import com.mongodb.kafka.connect.source.MongoSourceConfig.OutputFormat;
+import com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceConnector;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.Validator;
+import io.debezium.heartbeat.Heartbeat;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to consume change stream
+ * events.
+ */
+@PublicEvolving
+public class MongoDBSource {
+
+    public static final String MONGODB_SCHEME = "mongodb";
+
+    public static final String ERROR_TOLERANCE_NONE = ErrorTolerance.NONE.value();
+
+    public static final String ERROR_TOLERANCE_ALL = ErrorTolerance.ALL.value();
+
+    public static final String FULL_DOCUMENT_UPDATE_LOOKUP = FullDocument.UPDATE_LOOKUP.getValue();
+
+    public static final int POLL_MAX_BATCH_SIZE_DEFAULT = 1000;
+
+    public static final int POLL_AWAIT_TIME_MILLIS_DEFAULT = 1500;
+
+    public static final String HEARTBEAT_TOPIC_NAME_DEFAULT = "__mongodb_heartbeats";
+
+    public static final String OUTPUT_FORMAT_SCHEMA =
+            OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT);
+
+    // Add "source" field to adapt to debezium SourceRecord
+    public static final String OUTPUT_SCHEMA_VALUE_DEFAULT =
+            "{"
+                    + "  \"name\": \"ChangeStream\","
+                    + "  \"type\": \"record\","
+                    + "  \"fields\": ["
+                    + "    { \"name\": \"_id\", \"type\": \"string\" },"
+                    + "    { \"name\": \"operationType\", \"type\": [\"string\", \"null\"] },"
+                    + "    { \"name\": \"fullDocument\", \"type\": [\"string\", \"null\"] },"
+                    + "    { \"name\": \"source\","
+                    + "      \"type\": [{\"name\": \"source\", \"type\": \"record\", \"fields\": ["
+                    + "                {\"name\": \"ts_ms\", \"type\": \"long\"},"
+                    + "                {\"name\": \"snapshot\", \"type\": [\"string\", \"null\"] } ]"
+                    + "               }, \"null\" ] },"
+                    + "    { \"name\": \"ns\","
+                    + "      \"type\": [{\"name\": \"ns\", \"type\": \"record\", \"fields\": ["
+                    + "                {\"name\": \"db\", \"type\": \"string\"},"
+                    + "                {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
+                    + "               }, \"null\" ] },"
+                    + "    { \"name\": \"to\","
+                    + "      \"type\": [{\"name\": \"to\", \"type\": \"record\",  \"fields\": ["
+                    + "                {\"name\": \"db\", \"type\": \"string\"},"
+                    + "                {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
+                    + "               }, \"null\" ] },"
+                    + "    { \"name\": \"documentKey\", \"type\": [\"string\", \"null\"] },"
+                    + "    { \"name\": \"updateDescription\","
+                    + "      \"type\": [{\"name\": \"updateDescription\",  \"type\": \"record\", \"fields\": ["
+                    + "                 {\"name\": \"updatedFields\", \"type\": [\"string\", \"null\"]},"
+                    + "                 {\"name\": \"removedFields\","
+                    + "                  \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]"
+                    + "                  }] }, \"null\"] },"
+                    + "    { \"name\": \"clusterTime\", \"type\": [\"string\", \"null\"] },"
+                    + "    { \"name\": \"txnNumber\", \"type\": [\"long\", \"null\"]},"
+                    + "    { \"name\": \"lsid\", \"type\": [{\"name\": \"lsid\", \"type\": \"record\","
+                    + "               \"fields\": [ {\"name\": \"id\", \"type\": \"string\"},"
+                    + "                             {\"name\": \"uid\", \"type\": \"string\"}] }, \"null\"] }"
+                    + "  ]"
+                    + "}";
+
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    private static String encodeValue(String value) {
+        try {
+            return URLEncoder.encode(value, StandardCharsets.UTF_8.name());
+        } catch (UnsupportedEncodingException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    /** Builder class of {@link MongoDBSource}. */
+    public static class Builder<T> {
+
+        private String hosts;
+        private String username;
+        private String password;
+        private List<String> databaseList;
+        private List<String> collectionList;
+        private String connectionOptions;
+        private Integer batchSize;
+        private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS_DEFAULT;
+        private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE_DEFAULT;
+        private Boolean copyExisting = true;
+        private Integer copyExistingMaxThreads;
+        private Integer copyExistingQueueSize;
+        private String copyExistingPipeline;
+        private Boolean errorsLogEnable;
+        private String errorsTolerance;
+        private Integer heartbeatIntervalMillis;
+        private DebeziumDeserializationSchema<T> deserializer;
+        private String inLongMetric;
+
+        /** The comma-separated list of hostname and port pairs of mongodb servers. */
+        public Builder<T> hosts(String hosts) {
+            this.hosts = hosts;
+            return this;
+        }
+
+        /**
+         * Ampersand (i.e. &) separated MongoDB connection options eg
+         * replicaSet=test&connectTimeoutMS=300000
+         * https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options
+         */
+        public Builder<T> connectionOptions(String connectionOptions) {
+            this.connectionOptions = connectionOptions;
+            return this;
+        }
+
+        /** Name of the database user to be used when connecting to MongoDB. */
+        public Builder<T> username(String username) {
+            this.username = username;
+            return this;
+        }
+
+        /** Password to be used when connecting to MongoDB. */
+        public Builder<T> password(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /** Regular expressions list that match database names to be monitored. */
+        public Builder<T> databaseList(String... databaseList) {
+            this.databaseList = Arrays.asList(databaseList);
+            return this;
+        }
+
+        /**
+         * Regular expressions that match fully-qualified collection identifiers for collections to
+         * be monitored. Each identifier is of the form {@code <databaseName>.<collectionName>}.
+         */
+        public Builder<T> collectionList(String... collectionList) {
+            this.collectionList = Arrays.asList(collectionList);
+            return this;
+        }
+
+        /**
+         * batch.size
+         *
+         * <p>The cursor batch size. Default: 0
+         */
+        public Builder<T> batchSize(int batchSize) {
+            checkArgument(batchSize >= 0);
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        /**
+         * poll.await.time.ms
+         *
+         * <p>The amount of time to wait before checking for new results on the change stream.
+         * Default: 3000
+         */
+        public Builder<T> pollAwaitTimeMillis(int pollAwaitTimeMillis) {
+            checkArgument(pollAwaitTimeMillis > 0);
+            this.pollAwaitTimeMillis = pollAwaitTimeMillis;
+            return this;
+        }
+
+        /**
+         * poll.max.batch.size
+         *
+         * <p>Maximum number of change stream documents to include in a single batch when polling
+         * for new data. This setting can be used to limit the amount of data buffered internally in
+         * the connector. Default: 1000
+         */
+        public Builder<T> pollMaxBatchSize(int pollMaxBatchSize) {
+            checkArgument(pollMaxBatchSize > 0);
+            this.pollMaxBatchSize = pollMaxBatchSize;
+            return this;
+        }
+
+        /**
+         * copy.existing
+         *
+         * <p>Copy existing data from source collections and convert them to Change Stream events on
+         * their respective topics. Any changes to the data that occur during the copy process are
+         * applied once the copy is completed.
+         */
+        public Builder<T> copyExisting(boolean copyExisting) {
+            this.copyExisting = copyExisting;
+            return this;
+        }
+
+        /**
+         * copy.existing.max.threads
+         *
+         * <p>The number of threads to use when performing the data copy. Defaults to the number of
+         * processors. Default: defaults to the number of processors
+         */
+        public Builder<T> copyExistingMaxThreads(int copyExistingMaxThreads) {
+            checkArgument(copyExistingMaxThreads > 0);
+            this.copyExistingMaxThreads = copyExistingMaxThreads;
+            return this;
+        }
+
+        /**
+         * copy.existing.queue.size
+         *
+         * <p>The max size of the queue to use when copying data. Default: 16000
+         */
+        public Builder<T> copyExistingQueueSize(int copyExistingQueueSize) {
+            checkArgument(copyExistingQueueSize > 0);
+            this.copyExistingQueueSize = copyExistingQueueSize;
+            return this;
+        }
+
+        /**
+         * copy.existing.pipeline eg. [ { "$match": { "closed": "false" } } ]
+         *
+         * <p>An array of JSON objects describing the pipeline operations to run when copying
+         * existing data. This can improve the use of indexes by the copying manager and make
+         * copying more efficient.
+         */
+        public Builder<T> copyExistingPipeline(String copyExistingPipeline) {
+            this.copyExistingPipeline = copyExistingPipeline;
+            return this;
+        }
+
+        /**
+         * errors.log.enable
+         *
+         * <p>Whether details of failed operations should be written to the log file. When set to
+         * true, both errors that are tolerated (determined by the errors.tolerance setting) and not
+         * tolerated are written. When set to false, errors that are tolerated are omitted.
+         */
+        public Builder<T> errorsLogEnable(boolean errorsLogEnable) {
+            this.errorsLogEnable = errorsLogEnable;
+            return this;
+        }
+
+        /**
+         * errors.tolerance
+         *
+         * <p>Whether to continue processing messages if an error is encountered. When set to none,
+         * the connector reports an error and blocks further processing of the rest of the records
+         * when it encounters an error. When set to all, the connector silently ignores any bad
+         * messages.
+         *
+         * <p>Default: "none" Accepted Values: "none" or "all"
+         */
+        public Builder<T> errorsTolerance(String errorsTolerance) {
+            this.errorsTolerance = errorsTolerance;
+            return this;
+        }
+
+        /**
+         * heartbeat.interval.ms
+         *
+         * <p>The length of time in milliseconds between sending heartbeat messages. Heartbeat
+         * messages contain the post batch resume token and are sent when no source records have
+         * been published in the specified interval. This improves the resumability of the connector
+         * for low volume namespaces. Use 0 to disable.
+         */
+        public Builder<T> heartbeatIntervalMillis(int heartbeatIntervalMillis) {
+            checkArgument(heartbeatIntervalMillis >= 0);
+            this.heartbeatIntervalMillis = heartbeatIntervalMillis;
+            return this;
+        }
+
+        /**
+         * The deserializer used to convert from consumed {@link
+         * org.apache.kafka.connect.source.SourceRecord}.
+         */
+        public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
+            this.deserializer = deserializer;
+            return this;
+        }
+
+        public Builder<T> inLongMetric(String inLongMetric) {
+            this.inLongMetric = inLongMetric;
+            return this;
+        }
+
+        /** Build connection uri. */
+        @VisibleForTesting
+        public ConnectionString buildConnectionUri() {
+            StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://");
+
+            if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
+                sb.append(encodeValue(username))
+                        .append(":")
+                        .append(encodeValue(password))
+                        .append("@");
+            }
+
+            sb.append(checkNotNull(hosts));
+
+            if (StringUtils.isNotEmpty(connectionOptions)) {
+                sb.append("/?").append(connectionOptions);
+            }
+
+            return new ConnectionString(sb.toString());
+        }
+
+        /**
+         * The properties of mongodb kafka connector.
+         * https://docs.mongodb.com/kafka-connector/current/kafka-source
+         */
+        public DebeziumSourceFunction<T> build() {
+            Properties props = new Properties();
+
+            props.setProperty(
+                    "connector.class", MongoDBConnectorSourceConnector.class.getCanonicalName());
+            props.setProperty("name", "mongodb_binlog_source");
+
+            ConnectionString connectionString = buildConnectionUri();
+            props.setProperty(
+                    MongoSourceConfig.CONNECTION_URI_CONFIG, String.valueOf(connectionString));
+
+            if (databaseList != null) {
+                props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));
+            }
+
+            if (collectionList != null) {
+                props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList));
+            }
+
+            props.setProperty(MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
+            props.setProperty(
+                    MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG,
+                    String.valueOf(Boolean.FALSE));
+
+            props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_KEY_CONFIG, OUTPUT_FORMAT_SCHEMA);
+            props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG, OUTPUT_FORMAT_SCHEMA);
+            props.setProperty(
+                    MongoSourceConfig.OUTPUT_SCHEMA_INFER_VALUE_CONFIG,
+                    String.valueOf(Boolean.FALSE));
+            props.setProperty(
+                    MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA_VALUE_DEFAULT);
+
+            if (batchSize != null) {
+                props.setProperty(MongoSourceConfig.BATCH_SIZE_CONFIG, String.valueOf(batchSize));
+            }
+
+            if (pollAwaitTimeMillis != null) {
+                props.setProperty(
+                        MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG,
+                        String.valueOf(pollAwaitTimeMillis));
+            }
+
+            if (pollMaxBatchSize != null) {
+                props.setProperty(
+                        MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG,
+                        String.valueOf(pollMaxBatchSize));
+            }
+
+            if (errorsLogEnable != null) {
+                props.setProperty(
+                        MongoSourceConfig.ERRORS_LOG_ENABLE_CONFIG,
+                        String.valueOf(errorsLogEnable));
+            }
+
+            if (errorsTolerance != null) {
+                props.setProperty(MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, errorsTolerance);
+            }
+
+            if (copyExisting != null) {
+                props.setProperty(
+                        MongoSourceConfig.COPY_EXISTING_CONFIG, String.valueOf(copyExisting));
+            }
+
+            if (copyExistingMaxThreads != null) {
+                props.setProperty(
+                        MongoSourceConfig.COPY_EXISTING_MAX_THREADS_CONFIG,
+                        String.valueOf(copyExistingMaxThreads));
+            }
+
+            if (copyExistingQueueSize != null) {
+                props.setProperty(
+                        MongoSourceConfig.COPY_EXISTING_QUEUE_SIZE_CONFIG,
+                        String.valueOf(copyExistingQueueSize));
+            }
+
+            if (copyExistingPipeline != null) {
+                props.setProperty(
+                        MongoSourceConfig.COPY_EXISTING_PIPELINE_CONFIG, copyExistingPipeline);
+            }
+
+            if (heartbeatIntervalMillis != null) {
+                props.setProperty(
+                        MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
+                        String.valueOf(heartbeatIntervalMillis));
+            }
+
+            props.setProperty(
+                    MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, HEARTBEAT_TOPIC_NAME_DEFAULT);
+
+            // Let DebeziumChangeFetcher recognize heartbeat record
+            props.setProperty(
+                    Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME_DEFAULT);
+
+            return new DebeziumSourceFunction<>(
+                    deserializer, props, null, Validator.getDefaultValidator(), inLongMetric);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
new file mode 100644
index 000000000..e037ef55a
--- /dev/null
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mongodb.table;
+
+import com.ververica.cdc.connectors.mongodb.table.MongoDBConnectorDeserializationSchema;
+import com.ververica.cdc.connectors.mongodb.table.MongoDBReadableMetadata;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.cdc.mongodb.DebeziumSourceFunction;
+import org.apache.inlong.sort.cdc.mongodb.MongoDBSource;
+
+import javax.annotation.Nullable;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.mongodb.MongoNamespace.checkCollectionNameValidity;
+import static com.mongodb.MongoNamespace.checkDatabaseNameValidity;
+import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.containsRegexMetaCharacters;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a MongoDB change stream events source
+ * from a logical description.
+ */
+public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+    private final ResolvedSchema physicalSchema;
+    private final String hosts;
+    private final String connectionOptions;
+    private final String username;
+    private final String password;
+    private final String database;
+    private final String collection;
+    private final Boolean errorsLogEnable;
+    private final String errorsTolerance;
+    private final Boolean copyExisting;
+    private final String copyExistingPipeline;
+    private final Integer copyExistingMaxThreads;
+    private final Integer copyExistingQueueSize;
+    private final Integer pollMaxBatchSize;
+    private final Integer pollAwaitTimeMillis;
+    private final Integer heartbeatIntervalMillis;
+    private final ZoneId localTimeZone;
+
+    private final String inLongMetric;
+
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    public MongoDBTableSource(
+            ResolvedSchema physicalSchema,
+            String hosts,
+            @Nullable String username,
+            @Nullable String password,
+            @Nullable String database,
+            @Nullable String collection,
+            @Nullable String connectionOptions,
+            @Nullable String errorsTolerance,
+            @Nullable Boolean errorsLogEnable,
+            @Nullable Boolean copyExisting,
+            @Nullable String copyExistingPipeline,
+            @Nullable Integer copyExistingMaxThreads,
+            @Nullable Integer copyExistingQueueSize,
+            @Nullable Integer pollMaxBatchSize,
+            @Nullable Integer pollAwaitTimeMillis,
+            @Nullable Integer heartbeatIntervalMillis,
+            ZoneId localTimeZone,
+            String inLongMetric) {
+        this.physicalSchema = physicalSchema;
+        this.hosts = checkNotNull(hosts);
+        this.username = username;
+        this.password = password;
+        this.database = database;
+        this.collection = collection;
+        this.connectionOptions = connectionOptions;
+        this.errorsTolerance = errorsTolerance;
+        this.errorsLogEnable = errorsLogEnable;
+        this.copyExisting = copyExisting;
+        this.copyExistingPipeline = copyExistingPipeline;
+        this.copyExistingMaxThreads = copyExistingMaxThreads;
+        this.copyExistingQueueSize = copyExistingQueueSize;
+        this.pollMaxBatchSize = pollMaxBatchSize;
+        this.pollAwaitTimeMillis = pollAwaitTimeMillis;
+        this.heartbeatIntervalMillis = heartbeatIntervalMillis;
+        this.localTimeZone = localTimeZone;
+        this.producedDataType = physicalSchema.toPhysicalRowDataType();
+        this.metadataKeys = Collections.emptyList();
+        this.inLongMetric = inLongMetric;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        RowType physicalDataType =
+                (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+        MetadataConverter[] metadataConverters = getMetadataConverters();
+        TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
+
+        DebeziumDeserializationSchema<RowData> deserializer =
+                new MongoDBConnectorDeserializationSchema(
+                        physicalDataType, metadataConverters, typeInfo, localTimeZone);
+
+        MongoDBSource.Builder<RowData> builder =
+                MongoDBSource.<RowData>builder().hosts(hosts).deserializer(deserializer);
+
+        if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
+            // explicitly specified database and collection.
+            if (!containsRegexMetaCharacters(database)
+                    && !containsRegexMetaCharacters(collection)) {
+                checkDatabaseNameValidity(database);
+                checkCollectionNameValidity(collection);
+                builder.databaseList(database);
+                builder.collectionList(database + "." + collection);
+            } else {
+                builder.databaseList(database);
+                builder.collectionList(collection);
+            }
+        } else if (StringUtils.isNotEmpty(database)) {
+            builder.databaseList(database);
+        } else if (StringUtils.isNotEmpty(collection)) {
+            builder.collectionList(collection);
+        } else {
+            // Watching all changes on the cluster by default, we do nothing here
+        }
+
+        Optional.ofNullable(username).ifPresent(builder::username);
+        Optional.ofNullable(password).ifPresent(builder::password);
+        Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
+        Optional.ofNullable(errorsLogEnable).ifPresent(builder::errorsLogEnable);
+        Optional.ofNullable(errorsTolerance).ifPresent(builder::errorsTolerance);
+        Optional.ofNullable(copyExisting).ifPresent(builder::copyExisting);
+        Optional.ofNullable(copyExistingPipeline).ifPresent(builder::copyExistingPipeline);
+        Optional.ofNullable(copyExistingMaxThreads).ifPresent(builder::copyExistingMaxThreads);
+        Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize);
+        Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
+        Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
+        Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis);
+        Optional.ofNullable(inLongMetric).ifPresent(builder::inLongMetric);
+
+        DebeziumSourceFunction<RowData> sourceFunction = builder.build();
+
+        return SourceFunctionProvider.of(sourceFunction, false);
+    }
+
+    protected MetadataConverter[] getMetadataConverters() {
+        if (metadataKeys.isEmpty()) {
+            return new MetadataConverter[0];
+        }
+
+        return metadataKeys.stream()
+                .map(
+                        key ->
+                                Stream.of(MongoDBReadableMetadata.values())
+                                        .filter(m -> m.getKey().equals(key))
+                                        .findFirst()
+                                        .orElseThrow(IllegalStateException::new))
+                .map(MongoDBReadableMetadata::getConverter)
+                .toArray(MetadataConverter[]::new);
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Stream.of(MongoDBReadableMetadata.values())
+                .collect(
+                        Collectors.toMap(
+                                MongoDBReadableMetadata::getKey,
+                                MongoDBReadableMetadata::getDataType));
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        this.metadataKeys = metadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        MongoDBTableSource source =
+                new MongoDBTableSource(
+                        physicalSchema,
+                        hosts,
+                        username,
+                        password,
+                        database,
+                        collection,
+                        connectionOptions,
+                        errorsTolerance,
+                        errorsLogEnable,
+                        copyExisting,
+                        copyExistingPipeline,
+                        copyExistingMaxThreads,
+                        copyExistingQueueSize,
+                        pollMaxBatchSize,
+                        pollAwaitTimeMillis,
+                        heartbeatIntervalMillis,
+                        localTimeZone,
+                        inLongMetric);
+        source.metadataKeys = metadataKeys;
+        source.producedDataType = producedDataType;
+        return source;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MongoDBTableSource that = (MongoDBTableSource) o;
+        return Objects.equals(physicalSchema, that.physicalSchema)
+                && Objects.equals(hosts, that.hosts)
+                && Objects.equals(username, that.username)
+                && Objects.equals(password, that.password)
+                && Objects.equals(database, that.database)
+                && Objects.equals(collection, that.collection)
+                && Objects.equals(connectionOptions, that.connectionOptions)
+                && Objects.equals(errorsTolerance, that.errorsTolerance)
+                && Objects.equals(errorsLogEnable, that.errorsLogEnable)
+                && Objects.equals(copyExisting, that.copyExisting)
+                && Objects.equals(copyExistingPipeline, that.copyExistingPipeline)
+                && Objects.equals(copyExistingMaxThreads, that.copyExistingMaxThreads)
+                && Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize)
+                && Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
+                && Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis)
+                && Objects.equals(heartbeatIntervalMillis, that.heartbeatIntervalMillis)
+                && Objects.equals(localTimeZone, that.localTimeZone)
+                && Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(inLongMetric, that.inLongMetric);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                physicalSchema,
+                hosts,
+                username,
+                password,
+                database,
+                collection,
+                connectionOptions,
+                errorsTolerance,
+                errorsLogEnable,
+                copyExisting,
+                copyExistingPipeline,
+                copyExistingMaxThreads,
+                copyExistingQueueSize,
+                pollMaxBatchSize,
+                pollAwaitTimeMillis,
+                heartbeatIntervalMillis,
+                localTimeZone,
+                producedDataType,
+                metadataKeys,
+                inLongMetric);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "MongoDB-CDC";
+    }
+}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
new file mode 100644
index 000000000..d7299cf19
--- /dev/null
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mongodb.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_NONE;
+import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
+import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Factory for creating configured instance of {@link MongoDBTableSource}.
+ */
+public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
+
+    private static final String IDENTIFIER = "mongodb-cdc-inlong";
+
+    private static final String DOCUMENT_ID_FIELD = "_id";
+
+    public static final ConfigOption<String> INLONG_METRIC =
+            ConfigOptions.key("inlong.metric")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
+
+    private static final ConfigOption<String> HOSTS =
+            ConfigOptions.key("hosts")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The comma-separated list of hostname and port pairs of the MongoDB servers. "
+                                    + "eg. localhost:27017,localhost:27018");
+
+    private static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the database user to be used when connecting to MongoDB. "
+                                    + "This is required only when MongoDB is configured to use authentication.");
+
+    private static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to be used when connecting to MongoDB. "
+                                    + "This is required only when MongoDB is configured to use authentication.");
+
+    private static final ConfigOption<String> DATABASE =
+            ConfigOptions.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the database to watch for changes."
+                                    + "The database also supports regular expression "
+                                    + "to monitor multiple databases matches the regular expression."
+                                    + "e.g. db[0-9] .");
+
+    private static final ConfigOption<String> COLLECTION =
+            ConfigOptions.key("collection")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the collection in the database to watch for changes."
+                                    + "The collection also supports regular expression "
+                                    + "to monitor multiple collections matches fully-qualified collection identifiers."
+                                    + "e.g. db0\\.coll[0-9] .");
+
+    private static final ConfigOption<String> CONNECTION_OPTIONS =
+            ConfigOptions.key("connection.options")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The ampersand-separated MongoDB connection options. "
+                                    + "eg. replicaSet=test&connectTimeoutMS=300000");
+
+    private static final ConfigOption<String> ERRORS_TOLERANCE =
+            ConfigOptions.key("errors.tolerance")
+                    .stringType()
+                    .defaultValue(ERROR_TOLERANCE_NONE)
+                    .withDescription(
+                            "Whether to continue processing messages if an error is encountered. "
+                                    + "When set to none, the connector reports an error and blocks further processing "
+                                    + "of the rest of the records when it encounters an error. "
+                                    + "When set to all, the connector silently ignores any bad messages."
+                                    + "Accepted Values: 'none' or 'all'. Default 'none'.");
+
+    private static final ConfigOption<Boolean> ERRORS_LOG_ENABLE =
+            ConfigOptions.key("errors.log.enable")
+                    .booleanType()
+                    .defaultValue(Boolean.TRUE)
+                    .withDescription(
+                            "Whether details of failed operations should be written to the log file. "
+                                    + "When set to true, both errors that are tolerated (determined by the errors"
+                                    + ".tolerance setting) "
+                                    + "and not tolerated are written. When set to false, errors that are tolerated "
+                                    + "are omitted.");
+
+    private static final ConfigOption<Boolean> COPY_EXISTING =
+            ConfigOptions.key("copy.existing")
+                    .booleanType()
+                    .defaultValue(Boolean.TRUE)
+                    .withDescription(
+                            "Copy existing data from source collections and convert them "
+                                    + "to Change Stream events on their respective topics. Any changes to the data "
+                                    + "that occur during the copy process are applied once the copy is completed.");
+
+    private static final ConfigOption<String> COPY_EXISTING_PIPELINE =
+            ConfigOptions.key("copy.existing.pipeline")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "An array of JSON objects describing the pipeline operations "
+                                    + "to run when copying existing data. "
+                                    + "This can improve the use of indexes by the copying manager and make copying "
+                                    + "more efficient.");
+
+    private static final ConfigOption<Integer> COPY_EXISTING_MAX_THREADS =
+            ConfigOptions.key("copy.existing.max.threads")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The number of threads to use when performing the data copy."
+                                    + " Defaults to the number of processors.");
+
+    private static final ConfigOption<Integer> COPY_EXISTING_QUEUE_SIZE =
+            ConfigOptions.key("copy.existing.queue.size")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The max size of the queue to use when copying data. Defaults to 16000.");
+
+    private static final ConfigOption<Integer> POLL_MAX_BATCH_SIZE =
+            ConfigOptions.key("poll.max.batch.size")
+                    .intType()
+                    .defaultValue(POLL_MAX_BATCH_SIZE_DEFAULT)
+                    .withDescription(
+                            "Maximum number of change stream documents "
+                                    + "to include in a single batch when polling for new data. "
+                                    + "This setting can be used to limit the amount of data buffered internally in "
+                                    + "the connector. "
+                                    + "Defaults to 1000.");
+
+    private static final ConfigOption<Integer> POLL_AWAIT_TIME_MILLIS =
+            ConfigOptions.key("poll.await.time.ms")
+                    .intType()
+                    .defaultValue(POLL_AWAIT_TIME_MILLIS_DEFAULT)
+                    .withDescription(
+                            "The amount of time to wait before checking for new results on the change stream."
+                                    + "Defaults: 1500.");
+
+    private static final ConfigOption<Integer> HEARTBEAT_INTERVAL_MILLIS =
+            ConfigOptions.key("heartbeat.interval.ms")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The length of time in milliseconds between sending heartbeat messages."
+                                    + "Heartbeat messages contain the post batch resume token and are sent when no "
+                                    + "source records "
+                                    + "have been published in the specified interval. This improves the resumability "
+                                    + "of the connector "
+                                    + "for low volume namespaces. Use 0 to disable. Defaults to 0.");
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validate();
+
+        final ReadableConfig config = helper.getOptions();
+
+        String hosts = config.get(HOSTS);
+        String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);
+
+        String username = config.getOptional(USERNAME).orElse(null);
+        String password = config.getOptional(PASSWORD).orElse(null);
+
+        String database = config.getOptional(DATABASE).orElse(null);
+        String collection = config.getOptional(COLLECTION).orElse(null);
+
+        String errorsTolerance = config.get(ERRORS_TOLERANCE);
+        Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
+
+        Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
+        Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
+
+        Integer heartbeatIntervalMillis =
+                config.getOptional(HEARTBEAT_INTERVAL_MILLIS).orElse(null);
+
+        Boolean copyExisting = config.get(COPY_EXISTING);
+        String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
+        Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
+        Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
+
+        String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
+        ZoneId localTimeZone =
+                TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId)
+                        ? ZoneId.systemDefault()
+                        : ZoneId.of(zoneId);
+        String inLongMetric = config.get(INLONG_METRIC);
+
+        ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
+        checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
+        checkPrimaryKey(physicalSchema.getPrimaryKey().get(), "Primary key must be _id field");
+
+        return new MongoDBTableSource(
+                physicalSchema,
+                hosts,
+                username,
+                password,
+                database,
+                collection,
+                connectionOptions,
+                errorsTolerance,
+                errorsLogEnable,
+                copyExisting,
+                copyExistingPipeline,
+                copyExistingMaxThreads,
+                copyExistingQueueSize,
+                pollMaxBatchSize,
+                pollAwaitTimeMillis,
+                heartbeatIntervalMillis,
+                localTimeZone,
+                inLongMetric);
+    }
+
+    private void checkPrimaryKey(UniqueConstraint pk, String message) {
+        checkArgument(
+                pk.getColumns().size() == 1 && pk.getColumns().contains(DOCUMENT_ID_FIELD),
+                message);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(HOSTS);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        options.add(CONNECTION_OPTIONS);
+        options.add(DATABASE);
+        options.add(COLLECTION);
+        options.add(ERRORS_TOLERANCE);
+        options.add(ERRORS_LOG_ENABLE);
+        options.add(COPY_EXISTING);
+        options.add(COPY_EXISTING_PIPELINE);
+        options.add(COPY_EXISTING_MAX_THREADS);
+        options.add(COPY_EXISTING_QUEUE_SIZE);
+        options.add(POLL_MAX_BATCH_SIZE);
+        options.add(POLL_AWAIT_TIME_MILLIS);
+        options.add(HEARTBEAT_INTERVAL_MILLIS);
+        options.add(INLONG_METRIC);
+        return options;
+    }
+}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/mongodb-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..9f9de2b35
--- /dev/null
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.inlong.sort.cdc.mongodb.table.MongoDBTableSourceFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 3bdf92831..3724d2a8e 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -145,7 +145,7 @@ public class MySqlSource<T>
                 configFactory.createConfig(readerContext.getIndexOfSubtask());
         String inlongMetric = sourceConfig.getInlongMetric();
         if (StringUtils.isNotEmpty(inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split("_");
+            String[] inlongMetricArray = inlongMetric.split("&");
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index f936ac8d1..ca32dd4a9 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -34,7 +34,7 @@ public class MySqlSourceOptions {
             ConfigOptions.key("inlong.metric")
                     .stringType()
                     .defaultValue("")
-                    .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+                    .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
 
     public static final ConfigOption<String> HOSTNAME =
             ConfigOptions.key("hostname")
diff --git a/inlong-sort/sort-connectors/oracle-cdc/pom.xml b/inlong-sort/sort-connectors/oracle-cdc/pom.xml
index 1db604385..d31530e64 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/oracle-cdc/pom.xml
@@ -39,6 +39,11 @@
             <groupId>com.oracle.database.jdbc</groupId>
             <artifactId>ojdbc8</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -56,6 +61,7 @@
                         <configuration>
                             <artifactSet>
                                 <includes>
+                                    <include>org.apache.inlong:*</include>
                                     <include>io.debezium:debezium-api</include>
                                     <include>io.debezium:debezium-embedded</include>
                                     <include>io.debezium:debezium-core</include>
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
similarity index 95%
copy from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
copy to inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
index 157cdb3e6..166f7f1b7 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.cdc.postgres;
+package org.apache.inlong.sort.cdc.oracle;
 
 import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.Validator;
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -78,10 +79,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -115,33 +112,40 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
 public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
 
+    private static final long serialVersionUID = -5808108641062931623L;
+
+    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
     /**
      * State name of the consumer's partition offset states.
      */
     public static final String OFFSETS_STATE_NAME = "offset-states";
+
     /**
      * State name of the consumer's history records state.
      */
     public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
     /**
      * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
      */
     public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
     /**
      * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
      * not.
      */
     public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
     /**
      * The configuration value represents legacy implementation.
      */
     public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
-    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
-    private static final long serialVersionUID = -5808108641062931623L;
 
     // ---------------------------------------------------------------------------------------
     // Properties
     // ---------------------------------------------------------------------------------------
+
     /**
      * The schema to convert from Debezium's messages into Flink's objects.
      */
@@ -162,18 +166,21 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
      * Data for pending but uncommitted offsets.
      */
     private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-    /**
-     * Validator to validate the connected database satisfies the cdc connector's requirements.
-     */
-    private final Validator validator;
+
     /**
      * Flag indicating whether the Debezium Engine is started.
      */
     private volatile boolean debeziumStarted = false;
 
+    /**
+     * Validator to validate the connected database satisfies the cdc connector's requirements.
+     */
+    private final Validator validator;
+
     // ---------------------------------------------------------------------------------------
     // State
     // ---------------------------------------------------------------------------------------
+
     /**
      * The offsets to restore to, if the consumer restores state from a checkpoint.
      *
@@ -226,7 +233,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
 
     private String inlongMetric;
 
-    private SourceMetricData sourceMetricData;
+    private SourceMetricData metricData;
 
     // ---------------------------------------------------------------------------------------
 
@@ -414,17 +421,17 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
         if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split("_");
+            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            sourceMetricData = new SourceMetricData(metricGroup);
-            sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, NUM_RECORDS_IN);
-            sourceMetricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, NUM_BYTES_IN);
-            sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId, streamId,
-                    nodeId, NUM_BYTES_IN_PER_SECOND);
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
-                    nodeId, NUM_RECORDS_IN_PER_SECOND);
+            metricData = new SourceMetricData(metricGroup);
+            metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, Constants.NUM_RECORDS_IN);
+            metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, Constants.NUM_BYTES_IN);
+            metricData.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId,
+                    Constants.NUM_BYTES_IN_PER_SECOND);
+            metricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId,
+                    Constants.NUM_RECORDS_IN_PER_SECOND);
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -463,9 +470,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                         new DebeziumDeserializationSchema<T>() {
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
-                                if (sourceMetricData != null) {
-                                    sourceMetricData.getNumRecordsIn().inc(1L);
-                                    sourceMetricData.getNumBytesIn()
+                                if (metricData != null) {
+                                    metricData.getNumRecordsIn().inc(1L);
+                                    metricData.getNumBytesIn()
                                             .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
                                 }
                                 deserializer.deserialize(record, out);
@@ -639,6 +646,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
     }
 
     public SourceMetricData getMetricData() {
-        return sourceMetricData;
+        return metricData;
     }
 }
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
new file mode 100644
index 000000000..f9253f295
--- /dev/null
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle;
+
+import com.ververica.cdc.connectors.oracle.OracleValidator;
+import com.ververica.cdc.connectors.oracle.table.StartupOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import io.debezium.connector.oracle.OracleConnector;
+
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to consume log miner.
+ */
+public class OracleSource {
+
+    private static final String DATABASE_SERVER_NAME = "oracle_logminer";
+
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    /** Builder class of {@link OracleSource}. */
+    public static class Builder<T> {
+
+        private int port = 1521; // default 1521 port
+        private String hostname;
+        private String database;
+        private String username;
+        private String password;
+        private String[] tableList;
+        private String[] schemaList;
+        private Properties dbzProperties;
+        private StartupOptions startupOptions = StartupOptions.initial();
+        private DebeziumDeserializationSchema<T> deserializer;
+        private String inLongMetric;
+
+        public Builder<T> hostname(String hostname) {
+            this.hostname = hostname;
+            return this;
+        }
+
+        /** Integer port number of the Oracle database server. */
+        public Builder<T> port(int port) {
+            this.port = port;
+            return this;
+        }
+
+        /**
+         * An optional list of regular expressions that match database names to be monitored; any
+         * database name not included in the whitelist will be excluded from monitoring. By default
+         * all databases will be monitored.
+         */
+        public Builder<T> database(String database) {
+            this.database = database;
+            return this;
+        }
+
+        /**
+         * An optional list of regular expressions that match fully-qualified table identifiers for
+         * tables to be monitored; any table not included in the list will be excluded from
+         * monitoring. Each identifier is of the form schemaName.tableName. By default the connector
+         * will monitor every non-system table in each monitored database.
+         */
+        public Builder<T> tableList(String... tableList) {
+            this.tableList = tableList;
+            return this;
+        }
+
+        /**
+         * An optional list of regular expressions that match schema names to be monitored; any
+         * schema name not included in the whitelist will be excluded from monitoring. By default
+         * all non-system schemas will be monitored.
+         */
+        public Builder<T> schemaList(String... schemaList) {
+            this.schemaList = schemaList;
+            return this;
+        }
+
+        /** Name of the Oracle database to use when connecting to the Oracle database server. */
+        public Builder<T> username(String username) {
+            this.username = username;
+            return this;
+        }
+
+        /** Password to use when connecting to the Oracle database server. */
+        public Builder<T> password(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /** The Debezium Oracle connector properties. For example, "snapshot.mode". */
+        public Builder<T> debeziumProperties(Properties properties) {
+            this.dbzProperties = properties;
+            return this;
+        }
+
+        /**
+         * The deserializer used to convert from consumed {@link
+         * org.apache.kafka.connect.source.SourceRecord}.
+         */
+        public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
+            this.deserializer = deserializer;
+            return this;
+        }
+
+        /** Specifies the startup options. */
+        public Builder<T> startupOptions(StartupOptions startupOptions) {
+            this.startupOptions = startupOptions;
+            return this;
+        }
+
+        public Builder<T> inLongMetric(String inLongMetric) {
+            this.inLongMetric = inLongMetric;
+            return this;
+        }
+
+        public DebeziumSourceFunction<T> build() {
+            Properties props = new Properties();
+            props.setProperty("connector.class", OracleConnector.class.getCanonicalName());
+            // Logical name that identifies and provides a namespace for the particular Oracle
+            // database server being
+            // monitored. The logical name should be unique across all other connectors, since it is
+            // used as a prefix
+            // for all Kafka topic names emanating from this connector. Only alphanumeric characters
+            // and
+            // underscores should be used.
+            props.setProperty("database.server.name", DATABASE_SERVER_NAME);
+            props.setProperty("database.hostname", checkNotNull(hostname));
+            props.setProperty("database.user", checkNotNull(username));
+            props.setProperty("database.password", checkNotNull(password));
+            props.setProperty("database.port", String.valueOf(port));
+            props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
+            props.setProperty("database.dbname", checkNotNull(database));
+            if (schemaList != null) {
+                props.setProperty("schema.whitelist", String.join(",", schemaList));
+            }
+            if (tableList != null) {
+                props.setProperty("table.include.list", String.join(",", tableList));
+            }
+
+            DebeziumOffset specificOffset = null;
+            switch (startupOptions.startupMode) {
+                case INITIAL:
+                    props.setProperty("snapshot.mode", "initial");
+                    break;
+
+                case LATEST_OFFSET:
+                    props.setProperty("snapshot.mode", "schema_only");
+                    break;
+
+                default:
+                    throw new UnsupportedOperationException();
+            }
+
+            if (dbzProperties != null) {
+                props.putAll(dbzProperties);
+            }
+
+            return new DebeziumSourceFunction<>(
+                    deserializer, props, specificOffset, new OracleValidator(props), inLongMetric);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
new file mode 100644
index 000000000..36eead3b7
--- /dev/null
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.table;
+
+import com.ververica.cdc.connectors.oracle.table.OracleDeserializationConverterFactory;
+import com.ververica.cdc.connectors.oracle.table.OracleReadableMetaData;
+import com.ververica.cdc.connectors.oracle.table.StartupOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.cdc.oracle.DebeziumSourceFunction;
+import org.apache.inlong.sort.cdc.oracle.OracleSource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a Oracle binlog from a logical
+ * description.
+ */
+public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+    private final ResolvedSchema physicalSchema;
+    private final int port;
+    private final String hostname;
+    private final String database;
+    private final String username;
+    private final String password;
+    private final String tableName;
+    private final String schemaName;
+    private final Properties dbzProperties;
+    private final StartupOptions startupOptions;
+    private final String inLongMetric;
+
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    public OracleTableSource(
+            ResolvedSchema physicalSchema,
+            int port,
+            String hostname,
+            String database,
+            String tableName,
+            String schemaName,
+            String username,
+            String password,
+            Properties dbzProperties,
+            StartupOptions startupOptions,
+            String inLongMetric) {
+        this.physicalSchema = physicalSchema;
+        this.port = port;
+        this.hostname = checkNotNull(hostname);
+        this.database = checkNotNull(database);
+        this.tableName = checkNotNull(tableName);
+        this.schemaName = checkNotNull(schemaName);
+        this.username = checkNotNull(username);
+        this.password = checkNotNull(password);
+        this.dbzProperties = dbzProperties;
+        this.startupOptions = startupOptions;
+        this.producedDataType = physicalSchema.toPhysicalRowDataType();
+        this.metadataKeys = Collections.emptyList();
+        this.inLongMetric = inLongMetric;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        RowType physicalDataType =
+                (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+        MetadataConverter[] metadataConverters = getMetadataConverters();
+        TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
+
+        DebeziumDeserializationSchema<RowData> deserializer =
+                RowDataDebeziumDeserializeSchema.newBuilder()
+                        .setPhysicalRowType(physicalDataType)
+                        .setMetadataConverters(metadataConverters)
+                        .setResultTypeInfo(typeInfo)
+                        .setUserDefinedConverterFactory(
+                                OracleDeserializationConverterFactory.instance())
+                        .build();
+        OracleSource.Builder<RowData> builder =
+                OracleSource.<RowData>builder()
+                        .hostname(hostname)
+                        .port(port)
+                        .database(database)
+                        .tableList(schemaName + "." + tableName)
+                        .schemaList(schemaName)
+                        .username(username)
+                        .password(password)
+                        .debeziumProperties(dbzProperties)
+                        .startupOptions(startupOptions)
+                        .deserializer(deserializer)
+                        .inLongMetric(inLongMetric);
+        DebeziumSourceFunction<RowData> sourceFunction = builder.build();
+
+        return SourceFunctionProvider.of(sourceFunction, false);
+    }
+
+    private MetadataConverter[] getMetadataConverters() {
+        if (metadataKeys.isEmpty()) {
+            return new MetadataConverter[0];
+        }
+
+        return metadataKeys.stream()
+                .map(
+                        key ->
+                                Stream.of(OracleReadableMetaData.values())
+                                        .filter(m -> m.getKey().equals(key))
+                                        .findFirst()
+                                        .orElseThrow(IllegalStateException::new))
+                .map(OracleReadableMetaData::getConverter)
+                .toArray(MetadataConverter[]::new);
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        OracleTableSource source =
+                new OracleTableSource(
+                        physicalSchema,
+                        port,
+                        hostname,
+                        database,
+                        tableName,
+                        schemaName,
+                        username,
+                        password,
+                        dbzProperties,
+                        startupOptions,
+                        inLongMetric);
+        source.metadataKeys = metadataKeys;
+        source.producedDataType = producedDataType;
+        return source;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        OracleTableSource that = (OracleTableSource) o;
+        return port == that.port
+                && Objects.equals(physicalSchema, that.physicalSchema)
+                && Objects.equals(hostname, that.hostname)
+                && Objects.equals(database, that.database)
+                && Objects.equals(username, that.username)
+                && Objects.equals(password, that.password)
+                && Objects.equals(tableName, that.tableName)
+                && Objects.equals(schemaName, that.schemaName)
+                && Objects.equals(dbzProperties, that.dbzProperties)
+                && Objects.equals(startupOptions, that.startupOptions)
+                && Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(inLongMetric, that.inLongMetric);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                physicalSchema,
+                port,
+                hostname,
+                database,
+                username,
+                password,
+                tableName,
+                schemaName,
+                dbzProperties,
+                startupOptions,
+                producedDataType,
+                metadataKeys,
+                inLongMetric);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Oracle-CDC";
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Stream.of(OracleReadableMetaData.values())
+                .collect(
+                        Collectors.toMap(
+                                OracleReadableMetaData::getKey,
+                                OracleReadableMetaData::getDataType));
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        this.metadataKeys = metadataKeys;
+        this.producedDataType = producedDataType;
+    }
+}
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
similarity index 63%
copy from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
copy to inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index 7826202ac..c42642a9e 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -16,11 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.cdc.postgres.table;
+package org.apache.inlong.sort.cdc.oracle.table;
 
+import com.ververica.cdc.connectors.oracle.table.StartupOptions;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
@@ -29,120 +32,103 @@ import org.apache.flink.table.factories.FactoryUtil;
 import java.util.HashSet;
 import java.util.Set;
 
-import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
 import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
 
 /**
- * Factory for creating configured instance of
- * {@link com.ververica.cdc.connectors.postgres.table.PostgreSQLTableSource}.
+ * Factory for creating configured instance of {@link OracleTableSource}.
  */
-public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
+public class OracleTableSourceFactory implements DynamicTableSourceFactory {
 
-    private static final String IDENTIFIER = "postgres-cdc-inlong";
+    private static final String IDENTIFIER = "oracle-cdc-inlong";
 
     public static final ConfigOption<String> INLONG_METRIC =
             ConfigOptions.key("inlong.metric")
                     .stringType()
                     .defaultValue("")
-                    .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+                    .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
 
     private static final ConfigOption<String> HOSTNAME =
             ConfigOptions.key("hostname")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("IP address or hostname of the PostgreSQL database server.");
+                    .withDescription("IP address or hostname of the Oracle database server.");
 
     private static final ConfigOption<Integer> PORT =
             ConfigOptions.key("port")
                     .intType()
-                    .defaultValue(5432)
-                    .withDescription("Integer port number of the PostgreSQL database server.");
+                    .defaultValue(1521)
+                    .withDescription("Integer port number of the Oracle database server.");
 
     private static final ConfigOption<String> USERNAME =
             ConfigOptions.key("username")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Name of the PostgreSQL database to use when connecting to the PostgreSQL database server"
-                                    + ".");
+                            "Name of the Oracle database to use when connecting to the Oracle database server.");
 
     private static final ConfigOption<String> PASSWORD =
             ConfigOptions.key("password")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Password to use when connecting to the PostgreSQL database server.");
+                            "Password to use when connecting to the oracle database server.");
 
     private static final ConfigOption<String> DATABASE_NAME =
             ConfigOptions.key("database-name")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("Database name of the PostgreSQL server to monitor.");
+                    .withDescription("Database name of the Oracle server to monitor.");
 
     private static final ConfigOption<String> SCHEMA_NAME =
             ConfigOptions.key("schema-name")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("Schema name of the PostgreSQL database to monitor.");
+                    .withDescription("Schema name of the Oracle database to monitor.");
 
     private static final ConfigOption<String> TABLE_NAME =
             ConfigOptions.key("table-name")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("Table name of the PostgreSQL database to monitor.");
+                    .withDescription("Table name of the Oracle database to monitor.");
 
-    private static final ConfigOption<String> DECODING_PLUGIN_NAME =
-            ConfigOptions.key("decoding.plugin.name")
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
                     .stringType()
-                    .defaultValue("decoderbufs")
+                    .defaultValue("initial")
                     .withDescription(
-                            "The name of the Postgres logical decoding plug-in installed on the server.\n"
-                                    + "Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,\n"
-                                    + "wal2json_rds_streaming and pgoutput.");
-
-    private static final ConfigOption<String> SLOT_NAME =
-            ConfigOptions.key("slot.name")
-                    .stringType()
-                    .defaultValue("flink")
-                    .withDescription(
-                            "The name of the PostgreSQL logical decoding slot that was created for streaming changes "
-                                    + "from a particular plug-in for a particular database/schema. The server uses "
-                                    + "this slot "
-                                    + "to stream events to the connector that you are configuring. Default is "
-                                    + "\"flink\".");
+                            "Optional startup mode for Oracle CDC consumer, valid enumerations are "
+                                    + "\"initial\", \"latest-offset\"");
 
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         final FactoryUtil.TableFactoryHelper helper =
                 FactoryUtil.createTableFactoryHelper(this, context);
-        helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
+        helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
 
         final ReadableConfig config = helper.getOptions();
         String hostname = config.get(HOSTNAME);
         String username = config.get(USERNAME);
         String password = config.get(PASSWORD);
         String databaseName = config.get(DATABASE_NAME);
-        String schemaName = config.get(SCHEMA_NAME);
         String tableName = config.get(TABLE_NAME);
+        String schemaName = config.get(SCHEMA_NAME);
         int port = config.get(PORT);
-        String pluginName = config.get(DECODING_PLUGIN_NAME);
-        String slotName = config.get(SLOT_NAME);
+        StartupOptions startupOptions = getStartupOptions(config);
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
-        String inlongMetric = config.get(INLONG_METRIC);
+        String inLongMetric = config.get(INLONG_METRIC);
 
-        return new PostgreSQLTableSource(
+        return new OracleTableSource(
                 physicalSchema,
                 port,
                 hostname,
                 databaseName,
-                schemaName,
                 tableName,
+                schemaName,
                 username,
                 password,
-                pluginName,
-                slotName,
                 getDebeziumProperties(context.getCatalogTable().getOptions()),
-                inlongMetric);
+                startupOptions,
+                inLongMetric);
     }
 
     @Override
@@ -157,8 +143,8 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
         options.add(USERNAME);
         options.add(PASSWORD);
         options.add(DATABASE_NAME);
-        options.add(SCHEMA_NAME);
         options.add(TABLE_NAME);
+        options.add(SCHEMA_NAME);
         return options;
     }
 
@@ -166,9 +152,32 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(PORT);
-        options.add(DECODING_PLUGIN_NAME);
-        options.add(SLOT_NAME);
+        options.add(SCAN_STARTUP_MODE);
         options.add(INLONG_METRIC);
         return options;
     }
+
+    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+    private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+
+    private static StartupOptions getStartupOptions(ReadableConfig config) {
+        String modeString = config.get(SCAN_STARTUP_MODE);
+
+        switch (modeString.toLowerCase()) {
+            case SCAN_STARTUP_MODE_VALUE_INITIAL:
+                return StartupOptions.initial();
+
+            case SCAN_STARTUP_MODE_VALUE_LATEST:
+                return StartupOptions.latest();
+
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s",
+                                SCAN_STARTUP_MODE.key(),
+                                SCAN_STARTUP_MODE_VALUE_INITIAL,
+                                SCAN_STARTUP_MODE_VALUE_LATEST,
+                                modeString));
+        }
+    }
 }
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..247510821
--- /dev/null
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.inlong.sort.cdc.oracle.table.OracleTableSourceFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/postgres-cdc/pom.xml b/inlong-sort/sort-connectors/postgres-cdc/pom.xml
index 12b2ceae8..c7c3faae9 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/postgres-cdc/pom.xml
@@ -63,6 +63,7 @@
                         <configuration>
                             <artifactSet>
                                 <includes>
+                                    <include>org.apache.inlong:*</include>
                                     <include>io.debezium:debezium-api</include>
                                     <include>io.debezium:debezium-embedded</include>
                                     <include>io.debezium:debezium-core</include>
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index 157cdb3e6..cc02fd1e2 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -414,7 +415,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
         if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split("_");
+            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
index 7826202ac..985de3208 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -44,7 +44,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
             ConfigOptions.key("inlong.metric")
                     .stringType()
                     .defaultValue("")
-                    .withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
+                    .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
 
     private static final ConfigOption<String> HOSTNAME =
             ConfigOptions.key("hostname")
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 9a86ef2ca..73bf1dcdd 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -436,6 +436,14 @@
       inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
       inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
       inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+      inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
+      inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+      inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+      inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+      inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
+      inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
+      inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
+      inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
  Source  : flink-cdc-connectors 2.2.1 (Please note that the software have been modified.)
  License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
  
@@ -486,13 +494,13 @@
   Source  : flink-connector-hbase-2.2 1.13.5 (Please note that the software have been modified.)
   License : https://github.com/apache/flink/blob/master/LICENSE
 
-   1.3.6 inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
-         inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
-         inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
-         inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
-         inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
-    Source  : iceberg-flink-runtime-1.13 1.13.5 (Please note that the software have been modified.)
-    License : https://github.com/apache/iceberg/LICENSE
+ 1.3.6 inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+  Source  : iceberg-flink-runtime-1.13 1.13.5 (Please note that the software have been modified.)
+  License : https://github.com/apache/iceberg/LICENSE
 
 =======================================================================
 Apache InLong Subcomponents: