You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/06 11:09:03 UTC

[inlong] branch master updated: [INLONG-7446][Sort] Upgrade MongoDB CDC to version 2.3 (#7515)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ebdb19474 [INLONG-7446][Sort] Upgrade MongoDB CDC to version 2.3 (#7515)
ebdb19474 is described below

commit ebdb19474d43a8d63c74239f508927f6017a1149
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Mon Mar 6 19:08:57 2023 +0800

    [INLONG-7446][Sort] Upgrade MongoDB CDC to version 2.3 (#7515)
---
 inlong-sort/sort-connectors/mongodb-cdc/pom.xml    |   4 +
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   |   2 +-
 .../inlong/sort/cdc/mongodb/MongoDBSource.java     | 106 ++++++-------
 .../internal/FlinkDatabaseSchemaHistory.java       |   2 +-
 .../cdc/mongodb/debezium/utils/RecordUtils.java    |   2 +-
 .../sort/cdc/mongodb/table/MongoDBTableSource.java |  30 +---
 .../mongodb/table/MongoDBTableSourceFactory.java   | 166 ++-------------------
 licenses/inlong-sort-connectors/LICENSE            |   2 +-
 pom.xml                                            |   2 +-
 9 files changed, 63 insertions(+), 253 deletions(-)

diff --git a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
index 38bb08f93..f7bc46360 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
@@ -29,6 +29,10 @@
     <packaging>jar</packaging>
     <name>Apache InLong - Sort-connector-mongodb-cdc</name>
 
+    <properties>
+        <debezium.version>1.6.4.Final</debezium.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>com.ververica</groupId>
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 9ef76410f..83bc75e49 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.cdc.mongodb;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
 import com.ververica.cdc.debezium.Validator;
 import io.debezium.connector.SnapshotRecord;
@@ -43,7 +44,6 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
index be30c827d..4c6de78ee 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.cdc.mongodb;
 import com.mongodb.ConnectionString;
 import com.mongodb.client.model.changestream.FullDocument;
 import com.mongodb.kafka.connect.source.MongoSourceConfig;
-import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
 import com.mongodb.kafka.connect.source.MongoSourceConfig.OutputFormat;
 import com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceConnector;
 import com.ververica.cdc.debezium.Validator;
@@ -40,6 +39,13 @@ import java.util.Properties;
 
 import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
 import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -52,60 +58,11 @@ public class MongoDBSource {
 
     public static final String MONGODB_SCHEME = "mongodb";
 
-    public static final String ERROR_TOLERANCE_NONE = ErrorTolerance.NONE.value();
-
-    public static final String ERROR_TOLERANCE_ALL = ErrorTolerance.ALL.value();
-
     public static final String FULL_DOCUMENT_UPDATE_LOOKUP = FullDocument.UPDATE_LOOKUP.getValue();
 
-    public static final int POLL_MAX_BATCH_SIZE_DEFAULT = 1000;
-
-    public static final int POLL_AWAIT_TIME_MILLIS_DEFAULT = 1500;
-
-    public static final String HEARTBEAT_TOPIC_NAME_DEFAULT = "__mongodb_heartbeats";
-
     public static final String OUTPUT_FORMAT_SCHEMA =
             OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT);
 
-    // Add "source" field to adapt to debezium SourceRecord
-    public static final String OUTPUT_SCHEMA_VALUE_DEFAULT =
-            "{"
-                    + "  \"name\": \"ChangeStream\","
-                    + "  \"type\": \"record\","
-                    + "  \"fields\": ["
-                    + "    { \"name\": \"_id\", \"type\": \"string\" },"
-                    + "    { \"name\": \"operationType\", \"type\": [\"string\", \"null\"] },"
-                    + "    { \"name\": \"fullDocument\", \"type\": [\"string\", \"null\"] },"
-                    + "    { \"name\": \"source\","
-                    + "      \"type\": [{\"name\": \"source\", \"type\": \"record\", \"fields\": ["
-                    + "                {\"name\": \"ts_ms\", \"type\": \"long\"},"
-                    + "                {\"name\": \"snapshot\", \"type\": [\"string\", \"null\"] } ]"
-                    + "               }, \"null\" ] },"
-                    + "    { \"name\": \"ns\","
-                    + "      \"type\": [{\"name\": \"ns\", \"type\": \"record\", \"fields\": ["
-                    + "                {\"name\": \"db\", \"type\": \"string\"},"
-                    + "                {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
-                    + "               }, \"null\" ] },"
-                    + "    { \"name\": \"to\","
-                    + "      \"type\": [{\"name\": \"to\", \"type\": \"record\",  \"fields\": ["
-                    + "                {\"name\": \"db\", \"type\": \"string\"},"
-                    + "                {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
-                    + "               }, \"null\" ] },"
-                    + "    { \"name\": \"documentKey\", \"type\": [\"string\", \"null\"] },"
-                    + "    { \"name\": \"updateDescription\","
-                    + "      \"type\": [{\"name\": \"updateDescription\",  \"type\": \"record\", \"fields\": ["
-                    + "                 {\"name\": \"updatedFields\", \"type\": [\"string\", \"null\"]},"
-                    + "                 {\"name\": \"removedFields\","
-                    + "                  \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]"
-                    + "                  }] }, \"null\"] },"
-                    + "    { \"name\": \"clusterTime\", \"type\": [\"string\", \"null\"] },"
-                    + "    { \"name\": \"txnNumber\", \"type\": [\"long\", \"null\"]},"
-                    + "    { \"name\": \"lsid\", \"type\": [{\"name\": \"lsid\", \"type\": \"record\","
-                    + "               \"fields\": [ {\"name\": \"id\", \"type\": \"string\"},"
-                    + "                             {\"name\": \"uid\", \"type\": \"string\"}] }, \"null\"] }"
-                    + "  ]"
-                    + "}";
-
     public static <T> Builder<T> builder() {
         return new Builder<>();
     }
@@ -127,16 +84,17 @@ public class MongoDBSource {
         private List<String> databaseList;
         private List<String> collectionList;
         private String connectionOptions;
-        private Integer batchSize;
-        private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS_DEFAULT;
-        private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE_DEFAULT;
-        private Boolean copyExisting = true;
+        private Integer batchSize = BATCH_SIZE.defaultValue();
+        private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue();
+        private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue();
+        private Boolean updateLookup = true;
+        private Boolean copyExisting = COPY_EXISTING.defaultValue();
         private Integer copyExistingMaxThreads;
         private Integer copyExistingQueueSize;
         private String copyExistingPipeline;
         private Boolean errorsLogEnable;
         private String errorsTolerance;
-        private Integer heartbeatIntervalMillis;
+        private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
         private DebeziumDeserializationSchema<T> deserializer;
         private String inlongMetric;
         private String inlongAudit;
@@ -188,7 +146,11 @@ public class MongoDBSource {
         /**
          * batch.size
          *
-         * <p>The cursor batch size. Default: 0
+         * <p>The cursor batch size. Default: 1024
+         *
+         * <p>The change stream cursor batch size. Specifies the maximum number of change events to
+         * return in each batch of the response from the MongoDB cluster. The default is 0 meaning
+         * it uses the server's default value. Default: 0
          */
         public Builder<T> batchSize(int batchSize) {
             checkArgument(batchSize >= 0);
@@ -200,7 +162,7 @@ public class MongoDBSource {
          * poll.await.time.ms
          *
          * <p>The amount of time to wait before checking for new results on the change stream.
-         * Default: 3000
+         * Default: 1000
          */
         public Builder<T> pollAwaitTimeMillis(int pollAwaitTimeMillis) {
             checkArgument(pollAwaitTimeMillis > 0);
@@ -213,7 +175,7 @@ public class MongoDBSource {
          *
          * <p>Maximum number of change stream documents to include in a single batch when polling
          * for new data. This setting can be used to limit the amount of data buffered internally in
-         * the connector. Default: 1000
+         * the connector. Default: 1024
          */
         public Builder<T> pollMaxBatchSize(int pollMaxBatchSize) {
             checkArgument(pollMaxBatchSize > 0);
@@ -221,6 +183,19 @@ public class MongoDBSource {
             return this;
         }
 
+        /**
+         * change.stream.full.document
+         *
+         * <p>Determines what to return for update operations when using a Change Stream. When set
+         * to true, the change stream for partial updates will include both a delta describing the
+         * changes to the document and a copy of the entire document that was changed from some time
+         * after the change occurred. Default: true
+         */
+        public Builder<T> updateLookup(boolean updateLookup) {
+            this.updateLookup = updateLookup;
+            return this;
+        }
+
         /**
          * copy.existing
          *
@@ -248,7 +223,7 @@ public class MongoDBSource {
         /**
          * copy.existing.queue.size
          *
-         * <p>The max size of the queue to use when copying data. Default: 16000
+         * <p>The max size of the queue to use when copying data. Default: 10240
          */
         public Builder<T> copyExistingQueueSize(int copyExistingQueueSize) {
             checkArgument(copyExistingQueueSize > 0);
@@ -363,7 +338,7 @@ public class MongoDBSource {
 
             props.setProperty(
                     "connector.class", MongoDBConnectorSourceConnector.class.getCanonicalName());
-            props.setProperty("name", "mongodb_binlog_source");
+            props.setProperty("name", "mongodb_cdc_source");
 
             ConnectionString connectionString = buildConnectionUri();
             props.setProperty(
@@ -377,7 +352,10 @@ public class MongoDBSource {
                 props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList));
             }
 
-            props.setProperty(MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
+            if (updateLookup) {
+                props.setProperty(
+                        MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
+            }
             props.setProperty(
                     MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG,
                     String.valueOf(Boolean.FALSE));
@@ -388,7 +366,7 @@ public class MongoDBSource {
                     MongoSourceConfig.OUTPUT_SCHEMA_INFER_VALUE_CONFIG,
                     String.valueOf(Boolean.FALSE));
             props.setProperty(
-                    MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA_VALUE_DEFAULT);
+                    MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA);
 
             if (batchSize != null) {
                 props.setProperty(MongoSourceConfig.BATCH_SIZE_CONFIG, String.valueOf(batchSize));
@@ -445,11 +423,11 @@ public class MongoDBSource {
             }
 
             props.setProperty(
-                    MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, HEARTBEAT_TOPIC_NAME_DEFAULT);
+                    MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, HEARTBEAT_TOPIC_NAME);
 
             // Let DebeziumChangeFetcher recognize heartbeat record
             props.setProperty(
-                    Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME_DEFAULT);
+                    Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME);
 
             return new DebeziumSourceFunction<>(
                     deserializer, props, null, Validator.getDefaultValidator(),
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/FlinkDatabaseSchemaHistory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/FlinkDatabaseSchemaHistory.java
index a2919a739..10f0ca1a6 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/FlinkDatabaseSchemaHistory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/FlinkDatabaseSchemaHistory.java
@@ -185,7 +185,7 @@ public class FlinkDatabaseSchemaHistory implements DatabaseHistory {
     }
 
     @Override
-    public boolean storeOnlyMonitoredTables() {
+    public boolean storeOnlyCapturedTables() {
         return storeOnlyMonitoredTablesDdl;
     }
 
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
index fa0def5b4..26a657aa2 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
@@ -17,13 +17,13 @@
 
 package org.apache.inlong.sort.cdc.mongodb.debezium.utils;
 
+import com.google.common.collect.ImmutableMap;
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.relational.TableId;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BinaryType;
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
index 2bddb9ace..76ae9cc0b 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -48,7 +48,7 @@ import java.util.stream.Stream;
 
 import static com.mongodb.MongoNamespace.checkCollectionNameValidity;
 import static com.mongodb.MongoNamespace.checkDatabaseNameValidity;
-import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.containsRegexMetaCharacters;
+import static com.ververica.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils.containsRegexMetaCharacters;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -64,11 +64,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
     private final String password;
     private final String database;
     private final String collection;
-    private final Boolean errorsLogEnable;
-    private final String errorsTolerance;
     private final Boolean copyExisting;
-    private final String copyExistingPipeline;
-    private final Integer copyExistingMaxThreads;
     private final Integer copyExistingQueueSize;
     private final Integer pollMaxBatchSize;
     private final Integer pollAwaitTimeMillis;
@@ -98,11 +94,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
             @Nullable String database,
             @Nullable String collection,
             @Nullable String connectionOptions,
-            @Nullable String errorsTolerance,
-            @Nullable Boolean errorsLogEnable,
             @Nullable Boolean copyExisting,
-            @Nullable String copyExistingPipeline,
-            @Nullable Integer copyExistingMaxThreads,
             @Nullable Integer copyExistingQueueSize,
             @Nullable Integer pollMaxBatchSize,
             @Nullable Integer pollAwaitTimeMillis,
@@ -119,11 +111,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
         this.database = database;
         this.collection = collection;
         this.connectionOptions = connectionOptions;
-        this.errorsTolerance = errorsTolerance;
-        this.errorsLogEnable = errorsLogEnable;
         this.copyExisting = copyExisting;
-        this.copyExistingPipeline = copyExistingPipeline;
-        this.copyExistingMaxThreads = copyExistingMaxThreads;
         this.copyExistingQueueSize = copyExistingQueueSize;
         this.pollMaxBatchSize = pollMaxBatchSize;
         this.pollAwaitTimeMillis = pollAwaitTimeMillis;
@@ -187,11 +175,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
         Optional.ofNullable(username).ifPresent(builder::username);
         Optional.ofNullable(password).ifPresent(builder::password);
         Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
-        Optional.ofNullable(errorsLogEnable).ifPresent(builder::errorsLogEnable);
-        Optional.ofNullable(errorsTolerance).ifPresent(builder::errorsTolerance);
         Optional.ofNullable(copyExisting).ifPresent(builder::copyExisting);
-        Optional.ofNullable(copyExistingPipeline).ifPresent(builder::copyExistingPipeline);
-        Optional.ofNullable(copyExistingMaxThreads).ifPresent(builder::copyExistingMaxThreads);
         Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize);
         Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
         Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
@@ -245,11 +229,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
                         database,
                         collection,
                         connectionOptions,
-                        errorsTolerance,
-                        errorsLogEnable,
                         copyExisting,
-                        copyExistingPipeline,
-                        copyExistingMaxThreads,
                         copyExistingQueueSize,
                         pollMaxBatchSize,
                         pollAwaitTimeMillis,
@@ -280,11 +260,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
                 && Objects.equals(database, that.database)
                 && Objects.equals(collection, that.collection)
                 && Objects.equals(connectionOptions, that.connectionOptions)
-                && Objects.equals(errorsTolerance, that.errorsTolerance)
-                && Objects.equals(errorsLogEnable, that.errorsLogEnable)
                 && Objects.equals(copyExisting, that.copyExisting)
-                && Objects.equals(copyExistingPipeline, that.copyExistingPipeline)
-                && Objects.equals(copyExistingMaxThreads, that.copyExistingMaxThreads)
                 && Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize)
                 && Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
                 && Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis)
@@ -306,11 +282,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
                 database,
                 collection,
                 connectionOptions,
-                errorsTolerance,
-                errorsLogEnable,
                 copyExisting,
-                copyExistingPipeline,
-                copyExistingMaxThreads,
                 copyExistingQueueSize,
                 pollMaxBatchSize,
                 pollAwaitTimeMillis,
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index 9af14d86e..7fe19e27f 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -31,9 +31,17 @@ import java.time.ZoneId;
 import java.util.HashSet;
 import java.util.Set;
 
-import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_NONE;
-import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
-import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COLLECTION;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.CONNECTION_OPTIONS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING_QUEUE_SIZE;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.DATABASE;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HOSTS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.PASSWORD;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.USERNAME;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -48,145 +56,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
 
     private static final String DOCUMENT_ID_FIELD = "_id";
 
-    private static final ConfigOption<String> HOSTS =
-            ConfigOptions.key("hosts")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The comma-separated list of hostname and port pairs of the MongoDB servers. "
-                                    + "eg. localhost:27017,localhost:27018");
-
-    private static final ConfigOption<String> USERNAME =
-            ConfigOptions.key("username")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Name of the database user to be used when connecting to MongoDB. "
-                                    + "This is required only when MongoDB is configured to use authentication.");
-
-    private static final ConfigOption<String> PASSWORD =
-            ConfigOptions.key("password")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Password to be used when connecting to MongoDB. "
-                                    + "This is required only when MongoDB is configured to use authentication.");
-
-    private static final ConfigOption<String> DATABASE =
-            ConfigOptions.key("database")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Name of the database to watch for changes."
-                                    + "The database also supports regular expression "
-                                    + "to monitor multiple databases matches the regular expression."
-                                    + "e.g. db[0-9] .");
-
-    private static final ConfigOption<String> COLLECTION =
-            ConfigOptions.key("collection")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Name of the collection in the database to watch for changes."
-                                    + "The collection also supports regular expression "
-                                    + "to monitor multiple collections matches fully-qualified collection identifiers."
-                                    + "e.g. db0\\.coll[0-9] .");
-
-    private static final ConfigOption<String> CONNECTION_OPTIONS =
-            ConfigOptions.key("connection.options")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The ampersand-separated MongoDB connection options. "
-                                    + "eg. replicaSet=test&connectTimeoutMS=300000");
-
-    private static final ConfigOption<String> ERRORS_TOLERANCE =
-            ConfigOptions.key("errors.tolerance")
-                    .stringType()
-                    .defaultValue(ERROR_TOLERANCE_NONE)
-                    .withDescription(
-                            "Whether to continue processing messages if an error is encountered. "
-                                    + "When set to none, the connector reports an error and blocks further processing "
-                                    + "of the rest of the records when it encounters an error. "
-                                    + "When set to all, the connector silently ignores any bad messages."
-                                    + "Accepted Values: 'none' or 'all'. Default 'none'.");
-
-    private static final ConfigOption<Boolean> ERRORS_LOG_ENABLE =
-            ConfigOptions.key("errors.log.enable")
-                    .booleanType()
-                    .defaultValue(Boolean.TRUE)
-                    .withDescription(
-                            "Whether details of failed operations should be written to the log file. "
-                                    + "When set to true, both errors that are tolerated (determined by the errors"
-                                    + ".tolerance setting) "
-                                    + "and not tolerated are written. When set to false, errors that are tolerated "
-                                    + "are omitted.");
-
-    private static final ConfigOption<Boolean> COPY_EXISTING =
-            ConfigOptions.key("copy.existing")
-                    .booleanType()
-                    .defaultValue(Boolean.TRUE)
-                    .withDescription(
-                            "Copy existing data from source collections and convert them "
-                                    + "to Change Stream events on their respective topics. Any changes to the data "
-                                    + "that occur during the copy process are applied once the copy is completed.");
-
-    private static final ConfigOption<String> COPY_EXISTING_PIPELINE =
-            ConfigOptions.key("copy.existing.pipeline")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "An array of JSON objects describing the pipeline operations "
-                                    + "to run when copying existing data. "
-                                    + "This can improve the use of indexes by the copying manager and make copying "
-                                    + "more efficient.");
-
-    private static final ConfigOption<Integer> COPY_EXISTING_MAX_THREADS =
-            ConfigOptions.key("copy.existing.max.threads")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The number of threads to use when performing the data copy."
-                                    + " Defaults to the number of processors.");
-
-    private static final ConfigOption<Integer> COPY_EXISTING_QUEUE_SIZE =
-            ConfigOptions.key("copy.existing.queue.size")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The max size of the queue to use when copying data. Defaults to 16000.");
-
-    private static final ConfigOption<Integer> POLL_MAX_BATCH_SIZE =
-            ConfigOptions.key("poll.max.batch.size")
-                    .intType()
-                    .defaultValue(POLL_MAX_BATCH_SIZE_DEFAULT)
-                    .withDescription(
-                            "Maximum number of change stream documents "
-                                    + "to include in a single batch when polling for new data. "
-                                    + "This setting can be used to limit the amount of data buffered internally in "
-                                    + "the connector. "
-                                    + "Defaults to 1000.");
-
-    private static final ConfigOption<Integer> POLL_AWAIT_TIME_MILLIS =
-            ConfigOptions.key("poll.await.time.ms")
-                    .intType()
-                    .defaultValue(POLL_AWAIT_TIME_MILLIS_DEFAULT)
-                    .withDescription(
-                            "The amount of time to wait before checking for new results on the change stream."
-                                    + "Defaults: 1500.");
-
-    private static final ConfigOption<Integer> HEARTBEAT_INTERVAL_MILLIS =
-            ConfigOptions.key("heartbeat.interval.ms")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The length of time in milliseconds between sending heartbeat messages."
-                                    + "Heartbeat messages contain the post batch resume token and are sent when no "
-                                    + "source records "
-                                    + "have been published in the specified interval. This improves the resumability "
-                                    + "of the connector "
-                                    + "for low volume namespaces. Use 0 to disable. Defaults to 0.");
-
     public static final ConfigOption<String> ROW_KINDS_FILTERED =
             ConfigOptions.key("row-kinds-filtered")
                     .stringType()
@@ -216,9 +85,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
         final String database = config.getOptional(DATABASE).orElse(null);
         final String collection = config.getOptional(COLLECTION).orElse(null);
 
-        final String errorsTolerance = config.get(ERRORS_TOLERANCE);
-        final Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
-
         final Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
         final Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
 
@@ -226,8 +92,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
                 config.getOptional(HEARTBEAT_INTERVAL_MILLIS).orElse(null);
 
         final Boolean copyExisting = config.get(COPY_EXISTING);
-        final String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
-        final Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
         final Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
 
         final String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
@@ -255,11 +119,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
                 database,
                 collection,
                 connectionOptions,
-                errorsTolerance,
-                errorsLogEnable,
                 copyExisting,
-                copyExistingPipeline,
-                copyExistingMaxThreads,
                 copyExistingQueueSize,
                 pollMaxBatchSize,
                 pollAwaitTimeMillis,
@@ -297,11 +157,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
         options.add(CONNECTION_OPTIONS);
         options.add(DATABASE);
         options.add(COLLECTION);
-        options.add(ERRORS_TOLERANCE);
-        options.add(ERRORS_LOG_ENABLE);
         options.add(COPY_EXISTING);
-        options.add(COPY_EXISTING_PIPELINE);
-        options.add(COPY_EXISTING_MAX_THREADS);
         options.add(COPY_EXISTING_QUEUE_SIZE);
         options.add(POLL_MAX_BATCH_SIZE);
         options.add(POLL_AWAIT_TIME_MILLIS);
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index b7985228b..bae2f8cac 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -776,7 +776,7 @@ The text of each license is the standard Apache 2.0 license.
   org.apache.flink:flink-connector-hive_2.11:1.13.5 - Flink : Connectors : Hive (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-hive), (The Apache Software License, Version 2.0)
   org.apache.flink:flink-connector-jdbc_2.11:1.13.5 - Flink : Connectors : JDBC (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-jdbc), (The Apache Software License, Version 2.0)
   org.apache.flink:flink-connector-kafka_2.11:1.13.5 - Flink : Connectors : Kafka (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-kafka), (The Apache Software License, Version 2.0)
-  com.ververica:flink-connector-mongodb-cdc:2.2.1 - flink-connector-mongodb-cdc (https://github.com/ververica/flink-cdc-connectors/tree/release-2.2.1/flink-connector-mongodb-cdc), (The Apache Software License, Version 2.0)
+  com.ververica:flink-connector-mongodb-cdc:2.3.0 - flink-connector-mongodb-cdc (https://github.com/ververica/flink-cdc-connectors/tree/release-2.2.1/flink-connector-mongodb-cdc), (The Apache Software License, Version 2.0)
   com.ververica:flink-connector-mysql-cdc:2.2.1 - flink-connector-mysql-cdc (https://github.com/ververica/flink-cdc-connectors/tree/release-2.2.1/flink-connector-mysql-cdc), (The Apache Software License, Version 2.0)
   com.ververica:flink-connector-oracle-cdc:2.3.0 - flink-connector-oracle-cdc (https://github.com/ververica/flink-cdc-connectors/tree/release-2.3.0/flink-connector-oracle-cdc/), (The Apache Software License, Version 2.0)
   com.ververica:flink-connector-postgres-cdc:2.2.1 - flink-connector-postgres-cdc (https://github.com/ververica/flink-cdc-connectors/tree/release-2.2.1/flink-connector-postgres-cdc), (The Apache Software License, Version 2.0)
diff --git a/pom.xml b/pom.xml
index ce51e28ba..2be54be88 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,7 +161,7 @@
         <flink.connector.sqlserver.cdc.version>2.2.1</flink.connector.sqlserver.cdc.version>
         <flink.pulsar.version>1.13.6.2</flink.pulsar.version>
         <flink.protobuf.version>2.7.6</flink.protobuf.version>
-        <flink.connector.mongodb.cdc.version>2.2.1</flink.connector.mongodb.cdc.version>
+        <flink.connector.mongodb.cdc.version>2.3.0</flink.connector.mongodb.cdc.version>
         <flink.connector.oracle.cdc.version>2.3.0</flink.connector.oracle.cdc.version>
         <flink.connector.doris.version>1.0.3</flink.connector.doris.version>
         <flink.connector.redis>1.1.0</flink.connector.redis>