You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/06 11:09:03 UTC
[inlong] branch master updated: [INLONG-7446][Sort] Upgrade MongoDB CDC to version 2.3 (#7515)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ebdb19474 [INLONG-7446][Sort] Upgrade MongoDB CDC to version 2.3 (#7515)
ebdb19474 is described below
commit ebdb19474d43a8d63c74239f508927f6017a1149
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Mon Mar 6 19:08:57 2023 +0800
[INLONG-7446][Sort] Upgrade MongoDB CDC to version 2.3 (#7515)
---
inlong-sort/sort-connectors/mongodb-cdc/pom.xml | 4 +
.../sort/cdc/mongodb/DebeziumSourceFunction.java | 2 +-
.../inlong/sort/cdc/mongodb/MongoDBSource.java | 106 ++++++-------
.../internal/FlinkDatabaseSchemaHistory.java | 2 +-
.../cdc/mongodb/debezium/utils/RecordUtils.java | 2 +-
.../sort/cdc/mongodb/table/MongoDBTableSource.java | 30 +---
.../mongodb/table/MongoDBTableSourceFactory.java | 166 ++-------------------
licenses/inlong-sort-connectors/LICENSE | 2 +-
pom.xml | 2 +-
9 files changed, 63 insertions(+), 253 deletions(-)
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
index 38bb08f93..f7bc46360 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml
@@ -29,6 +29,10 @@
<packaging>jar</packaging>
<name>Apache InLong - Sort-connector-mongodb-cdc</name>
+ <properties>
+ <debezium.version>1.6.4.Final</debezium.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 9ef76410f..83bc75e49 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.cdc.mongodb;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import com.ververica.cdc.debezium.Validator;
import io.debezium.connector.SnapshotRecord;
@@ -43,7 +44,6 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
index be30c827d..4c6de78ee 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.cdc.mongodb;
import com.mongodb.ConnectionString;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
-import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
import com.mongodb.kafka.connect.source.MongoSourceConfig.OutputFormat;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceConnector;
import com.ververica.cdc.debezium.Validator;
@@ -40,6 +39,13 @@ import java.util.Properties;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -52,60 +58,11 @@ public class MongoDBSource {
public static final String MONGODB_SCHEME = "mongodb";
- public static final String ERROR_TOLERANCE_NONE = ErrorTolerance.NONE.value();
-
- public static final String ERROR_TOLERANCE_ALL = ErrorTolerance.ALL.value();
-
public static final String FULL_DOCUMENT_UPDATE_LOOKUP = FullDocument.UPDATE_LOOKUP.getValue();
- public static final int POLL_MAX_BATCH_SIZE_DEFAULT = 1000;
-
- public static final int POLL_AWAIT_TIME_MILLIS_DEFAULT = 1500;
-
- public static final String HEARTBEAT_TOPIC_NAME_DEFAULT = "__mongodb_heartbeats";
-
public static final String OUTPUT_FORMAT_SCHEMA =
OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT);
- // Add "source" field to adapt to debezium SourceRecord
- public static final String OUTPUT_SCHEMA_VALUE_DEFAULT =
- "{"
- + " \"name\": \"ChangeStream\","
- + " \"type\": \"record\","
- + " \"fields\": ["
- + " { \"name\": \"_id\", \"type\": \"string\" },"
- + " { \"name\": \"operationType\", \"type\": [\"string\", \"null\"] },"
- + " { \"name\": \"fullDocument\", \"type\": [\"string\", \"null\"] },"
- + " { \"name\": \"source\","
- + " \"type\": [{\"name\": \"source\", \"type\": \"record\", \"fields\": ["
- + " {\"name\": \"ts_ms\", \"type\": \"long\"},"
- + " {\"name\": \"snapshot\", \"type\": [\"string\", \"null\"] } ]"
- + " }, \"null\" ] },"
- + " { \"name\": \"ns\","
- + " \"type\": [{\"name\": \"ns\", \"type\": \"record\", \"fields\": ["
- + " {\"name\": \"db\", \"type\": \"string\"},"
- + " {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
- + " }, \"null\" ] },"
- + " { \"name\": \"to\","
- + " \"type\": [{\"name\": \"to\", \"type\": \"record\", \"fields\": ["
- + " {\"name\": \"db\", \"type\": \"string\"},"
- + " {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
- + " }, \"null\" ] },"
- + " { \"name\": \"documentKey\", \"type\": [\"string\", \"null\"] },"
- + " { \"name\": \"updateDescription\","
- + " \"type\": [{\"name\": \"updateDescription\", \"type\": \"record\", \"fields\": ["
- + " {\"name\": \"updatedFields\", \"type\": [\"string\", \"null\"]},"
- + " {\"name\": \"removedFields\","
- + " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]"
- + " }] }, \"null\"] },"
- + " { \"name\": \"clusterTime\", \"type\": [\"string\", \"null\"] },"
- + " { \"name\": \"txnNumber\", \"type\": [\"long\", \"null\"]},"
- + " { \"name\": \"lsid\", \"type\": [{\"name\": \"lsid\", \"type\": \"record\","
- + " \"fields\": [ {\"name\": \"id\", \"type\": \"string\"},"
- + " {\"name\": \"uid\", \"type\": \"string\"}] }, \"null\"] }"
- + " ]"
- + "}";
-
public static <T> Builder<T> builder() {
return new Builder<>();
}
@@ -127,16 +84,17 @@ public class MongoDBSource {
private List<String> databaseList;
private List<String> collectionList;
private String connectionOptions;
- private Integer batchSize;
- private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS_DEFAULT;
- private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE_DEFAULT;
- private Boolean copyExisting = true;
+ private Integer batchSize = BATCH_SIZE.defaultValue();
+ private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue();
+ private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue();
+ private Boolean updateLookup = true;
+ private Boolean copyExisting = COPY_EXISTING.defaultValue();
private Integer copyExistingMaxThreads;
private Integer copyExistingQueueSize;
private String copyExistingPipeline;
private Boolean errorsLogEnable;
private String errorsTolerance;
- private Integer heartbeatIntervalMillis;
+ private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
private DebeziumDeserializationSchema<T> deserializer;
private String inlongMetric;
private String inlongAudit;
@@ -188,7 +146,11 @@ public class MongoDBSource {
/**
* batch.size
*
- * <p>The cursor batch size. Default: 0
+ * <p>The cursor batch size. Default: 1024
+ *
+ * <p>The change stream cursor batch size. Specifies the maximum number of change events to
+ * return in each batch of the response from the MongoDB cluster. The default is 0 meaning
+ * it uses the server's default value. Default: 0
*/
public Builder<T> batchSize(int batchSize) {
checkArgument(batchSize >= 0);
@@ -200,7 +162,7 @@ public class MongoDBSource {
* poll.await.time.ms
*
* <p>The amount of time to wait before checking for new results on the change stream.
- * Default: 3000
+ * Default: 1000
*/
public Builder<T> pollAwaitTimeMillis(int pollAwaitTimeMillis) {
checkArgument(pollAwaitTimeMillis > 0);
@@ -213,7 +175,7 @@ public class MongoDBSource {
*
* <p>Maximum number of change stream documents to include in a single batch when polling
* for new data. This setting can be used to limit the amount of data buffered internally in
- * the connector. Default: 1000
+ * the connector. Default: 1024
*/
public Builder<T> pollMaxBatchSize(int pollMaxBatchSize) {
checkArgument(pollMaxBatchSize > 0);
@@ -221,6 +183,19 @@ public class MongoDBSource {
return this;
}
+ /**
+ * change.stream.full.document
+ *
+ * <p>Determines what to return for update operations when using a Change Stream. When set
+ * to true, the change stream for partial updates will include both a delta describing the
+ * changes to the document and a copy of the entire document that was changed from some time
+ * after the change occurred. Default: true
+ */
+ public Builder<T> updateLookup(boolean updateLookup) {
+ this.updateLookup = updateLookup;
+ return this;
+ }
+
/**
* copy.existing
*
@@ -248,7 +223,7 @@ public class MongoDBSource {
/**
* copy.existing.queue.size
*
- * <p>The max size of the queue to use when copying data. Default: 16000
+ * <p>The max size of the queue to use when copying data. Default: 10240
*/
public Builder<T> copyExistingQueueSize(int copyExistingQueueSize) {
checkArgument(copyExistingQueueSize > 0);
@@ -363,7 +338,7 @@ public class MongoDBSource {
props.setProperty(
"connector.class", MongoDBConnectorSourceConnector.class.getCanonicalName());
- props.setProperty("name", "mongodb_binlog_source");
+ props.setProperty("name", "mongodb_cdc_source");
ConnectionString connectionString = buildConnectionUri();
props.setProperty(
@@ -377,7 +352,10 @@ public class MongoDBSource {
props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList));
}
- props.setProperty(MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
+ if (updateLookup) {
+ props.setProperty(
+ MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
+ }
props.setProperty(
MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG,
String.valueOf(Boolean.FALSE));
@@ -388,7 +366,7 @@ public class MongoDBSource {
MongoSourceConfig.OUTPUT_SCHEMA_INFER_VALUE_CONFIG,
String.valueOf(Boolean.FALSE));
props.setProperty(
- MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA_VALUE_DEFAULT);
+ MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA);
if (batchSize != null) {
props.setProperty(MongoSourceConfig.BATCH_SIZE_CONFIG, String.valueOf(batchSize));
@@ -445,11 +423,11 @@ public class MongoDBSource {
}
props.setProperty(
- MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, HEARTBEAT_TOPIC_NAME_DEFAULT);
+ MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, HEARTBEAT_TOPIC_NAME);
// Let DebeziumChangeFetcher recognize heartbeat record
props.setProperty(
- Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME_DEFAULT);
+ Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME);
return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator(),
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/FlinkDatabaseSchemaHistory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/FlinkDatabaseSchemaHistory.java
index a2919a739..10f0ca1a6 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/FlinkDatabaseSchemaHistory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/FlinkDatabaseSchemaHistory.java
@@ -185,7 +185,7 @@ public class FlinkDatabaseSchemaHistory implements DatabaseHistory {
}
@Override
- public boolean storeOnlyMonitoredTables() {
+ public boolean storeOnlyCapturedTables() {
return storeOnlyMonitoredTablesDdl;
}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
index fa0def5b4..26a657aa2 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
@@ -17,13 +17,13 @@
package org.apache.inlong.sort.cdc.mongodb.debezium.utils;
+import com.google.common.collect.ImmutableMap;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.relational.TableId;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
index 2bddb9ace..76ae9cc0b 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -48,7 +48,7 @@ import java.util.stream.Stream;
import static com.mongodb.MongoNamespace.checkCollectionNameValidity;
import static com.mongodb.MongoNamespace.checkDatabaseNameValidity;
-import static com.ververica.cdc.connectors.mongodb.utils.CollectionDiscoveryUtils.containsRegexMetaCharacters;
+import static com.ververica.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils.containsRegexMetaCharacters;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -64,11 +64,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private final String password;
private final String database;
private final String collection;
- private final Boolean errorsLogEnable;
- private final String errorsTolerance;
private final Boolean copyExisting;
- private final String copyExistingPipeline;
- private final Integer copyExistingMaxThreads;
private final Integer copyExistingQueueSize;
private final Integer pollMaxBatchSize;
private final Integer pollAwaitTimeMillis;
@@ -98,11 +94,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
@Nullable String database,
@Nullable String collection,
@Nullable String connectionOptions,
- @Nullable String errorsTolerance,
- @Nullable Boolean errorsLogEnable,
@Nullable Boolean copyExisting,
- @Nullable String copyExistingPipeline,
- @Nullable Integer copyExistingMaxThreads,
@Nullable Integer copyExistingQueueSize,
@Nullable Integer pollMaxBatchSize,
@Nullable Integer pollAwaitTimeMillis,
@@ -119,11 +111,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
this.database = database;
this.collection = collection;
this.connectionOptions = connectionOptions;
- this.errorsTolerance = errorsTolerance;
- this.errorsLogEnable = errorsLogEnable;
this.copyExisting = copyExisting;
- this.copyExistingPipeline = copyExistingPipeline;
- this.copyExistingMaxThreads = copyExistingMaxThreads;
this.copyExistingQueueSize = copyExistingQueueSize;
this.pollMaxBatchSize = pollMaxBatchSize;
this.pollAwaitTimeMillis = pollAwaitTimeMillis;
@@ -187,11 +175,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
Optional.ofNullable(username).ifPresent(builder::username);
Optional.ofNullable(password).ifPresent(builder::password);
Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
- Optional.ofNullable(errorsLogEnable).ifPresent(builder::errorsLogEnable);
- Optional.ofNullable(errorsTolerance).ifPresent(builder::errorsTolerance);
Optional.ofNullable(copyExisting).ifPresent(builder::copyExisting);
- Optional.ofNullable(copyExistingPipeline).ifPresent(builder::copyExistingPipeline);
- Optional.ofNullable(copyExistingMaxThreads).ifPresent(builder::copyExistingMaxThreads);
Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize);
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
@@ -245,11 +229,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
database,
collection,
connectionOptions,
- errorsTolerance,
- errorsLogEnable,
copyExisting,
- copyExistingPipeline,
- copyExistingMaxThreads,
copyExistingQueueSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
@@ -280,11 +260,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
&& Objects.equals(database, that.database)
&& Objects.equals(collection, that.collection)
&& Objects.equals(connectionOptions, that.connectionOptions)
- && Objects.equals(errorsTolerance, that.errorsTolerance)
- && Objects.equals(errorsLogEnable, that.errorsLogEnable)
&& Objects.equals(copyExisting, that.copyExisting)
- && Objects.equals(copyExistingPipeline, that.copyExistingPipeline)
- && Objects.equals(copyExistingMaxThreads, that.copyExistingMaxThreads)
&& Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize)
&& Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
&& Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis)
@@ -306,11 +282,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
database,
collection,
connectionOptions,
- errorsTolerance,
- errorsLogEnable,
copyExisting,
- copyExistingPipeline,
- copyExistingMaxThreads,
copyExistingQueueSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index 9af14d86e..7fe19e27f 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -31,9 +31,17 @@ import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;
-import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_NONE;
-import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
-import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COLLECTION;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.CONNECTION_OPTIONS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING_QUEUE_SIZE;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.DATABASE;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HOSTS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.PASSWORD;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.USERNAME;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -48,145 +56,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
private static final String DOCUMENT_ID_FIELD = "_id";
- private static final ConfigOption<String> HOSTS =
- ConfigOptions.key("hosts")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "The comma-separated list of hostname and port pairs of the MongoDB servers. "
- + "eg. localhost:27017,localhost:27018");
-
- private static final ConfigOption<String> USERNAME =
- ConfigOptions.key("username")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Name of the database user to be used when connecting to MongoDB. "
- + "This is required only when MongoDB is configured to use authentication.");
-
- private static final ConfigOption<String> PASSWORD =
- ConfigOptions.key("password")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Password to be used when connecting to MongoDB. "
- + "This is required only when MongoDB is configured to use authentication.");
-
- private static final ConfigOption<String> DATABASE =
- ConfigOptions.key("database")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Name of the database to watch for changes."
- + "The database also supports regular expression "
- + "to monitor multiple databases matches the regular expression."
- + "e.g. db[0-9] .");
-
- private static final ConfigOption<String> COLLECTION =
- ConfigOptions.key("collection")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Name of the collection in the database to watch for changes."
- + "The collection also supports regular expression "
- + "to monitor multiple collections matches fully-qualified collection identifiers."
- + "e.g. db0\\.coll[0-9] .");
-
- private static final ConfigOption<String> CONNECTION_OPTIONS =
- ConfigOptions.key("connection.options")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "The ampersand-separated MongoDB connection options. "
- + "eg. replicaSet=test&connectTimeoutMS=300000");
-
- private static final ConfigOption<String> ERRORS_TOLERANCE =
- ConfigOptions.key("errors.tolerance")
- .stringType()
- .defaultValue(ERROR_TOLERANCE_NONE)
- .withDescription(
- "Whether to continue processing messages if an error is encountered. "
- + "When set to none, the connector reports an error and blocks further processing "
- + "of the rest of the records when it encounters an error. "
- + "When set to all, the connector silently ignores any bad messages."
- + "Accepted Values: 'none' or 'all'. Default 'none'.");
-
- private static final ConfigOption<Boolean> ERRORS_LOG_ENABLE =
- ConfigOptions.key("errors.log.enable")
- .booleanType()
- .defaultValue(Boolean.TRUE)
- .withDescription(
- "Whether details of failed operations should be written to the log file. "
- + "When set to true, both errors that are tolerated (determined by the errors"
- + ".tolerance setting) "
- + "and not tolerated are written. When set to false, errors that are tolerated "
- + "are omitted.");
-
- private static final ConfigOption<Boolean> COPY_EXISTING =
- ConfigOptions.key("copy.existing")
- .booleanType()
- .defaultValue(Boolean.TRUE)
- .withDescription(
- "Copy existing data from source collections and convert them "
- + "to Change Stream events on their respective topics. Any changes to the data "
- + "that occur during the copy process are applied once the copy is completed.");
-
- private static final ConfigOption<String> COPY_EXISTING_PIPELINE =
- ConfigOptions.key("copy.existing.pipeline")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "An array of JSON objects describing the pipeline operations "
- + "to run when copying existing data. "
- + "This can improve the use of indexes by the copying manager and make copying "
- + "more efficient.");
-
- private static final ConfigOption<Integer> COPY_EXISTING_MAX_THREADS =
- ConfigOptions.key("copy.existing.max.threads")
- .intType()
- .noDefaultValue()
- .withDescription(
- "The number of threads to use when performing the data copy."
- + " Defaults to the number of processors.");
-
- private static final ConfigOption<Integer> COPY_EXISTING_QUEUE_SIZE =
- ConfigOptions.key("copy.existing.queue.size")
- .intType()
- .noDefaultValue()
- .withDescription(
- "The max size of the queue to use when copying data. Defaults to 16000.");
-
- private static final ConfigOption<Integer> POLL_MAX_BATCH_SIZE =
- ConfigOptions.key("poll.max.batch.size")
- .intType()
- .defaultValue(POLL_MAX_BATCH_SIZE_DEFAULT)
- .withDescription(
- "Maximum number of change stream documents "
- + "to include in a single batch when polling for new data. "
- + "This setting can be used to limit the amount of data buffered internally in "
- + "the connector. "
- + "Defaults to 1000.");
-
- private static final ConfigOption<Integer> POLL_AWAIT_TIME_MILLIS =
- ConfigOptions.key("poll.await.time.ms")
- .intType()
- .defaultValue(POLL_AWAIT_TIME_MILLIS_DEFAULT)
- .withDescription(
- "The amount of time to wait before checking for new results on the change stream."
- + "Defaults: 1500.");
-
- private static final ConfigOption<Integer> HEARTBEAT_INTERVAL_MILLIS =
- ConfigOptions.key("heartbeat.interval.ms")
- .intType()
- .noDefaultValue()
- .withDescription(
- "The length of time in milliseconds between sending heartbeat messages."
- + "Heartbeat messages contain the post batch resume token and are sent when no "
- + "source records "
- + "have been published in the specified interval. This improves the resumability "
- + "of the connector "
- + "for low volume namespaces. Use 0 to disable. Defaults to 0.");
-
public static final ConfigOption<String> ROW_KINDS_FILTERED =
ConfigOptions.key("row-kinds-filtered")
.stringType()
@@ -216,9 +85,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
final String database = config.getOptional(DATABASE).orElse(null);
final String collection = config.getOptional(COLLECTION).orElse(null);
- final String errorsTolerance = config.get(ERRORS_TOLERANCE);
- final Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
-
final Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
final Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
@@ -226,8 +92,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
config.getOptional(HEARTBEAT_INTERVAL_MILLIS).orElse(null);
final Boolean copyExisting = config.get(COPY_EXISTING);
- final String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null);
- final Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null);
final Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
final String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
@@ -255,11 +119,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
database,
collection,
connectionOptions,
- errorsTolerance,
- errorsLogEnable,
copyExisting,
- copyExistingPipeline,
- copyExistingMaxThreads,
copyExistingQueueSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
@@ -297,11 +157,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(CONNECTION_OPTIONS);
options.add(DATABASE);
options.add(COLLECTION);
- options.add(ERRORS_TOLERANCE);
- options.add(ERRORS_LOG_ENABLE);
options.add(COPY_EXISTING);
- options.add(COPY_EXISTING_PIPELINE);
- options.add(COPY_EXISTING_MAX_THREADS);
options.add(COPY_EXISTING_QUEUE_SIZE);
options.add(POLL_MAX_BATCH_SIZE);
options.add(POLL_AWAIT_TIME_MILLIS);
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index b7985228b..bae2f8cac 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -776,7 +776,7 @@ The text of each license is the standard Apache 2.0 license.
org.apache.flink:flink-connector-hive_2.11:1.13.5 - Flink : Connectors : Hive (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-hive), (The Apache Software License, Version 2.0)
org.apache.flink:flink-connector-jdbc_2.11:1.13.5 - Flink : Connectors : JDBC (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-jdbc), (The Apache Software License, Version 2.0)
org.apache.flink:flink-connector-kafka_2.11:1.13.5 - Flink : Connectors : Kafka (https://github.com/apache/flink/tree/release-1.13.5/flink-connectors/flink-connector-kafka), (The Apache Software License, Version 2.0)
- com.ververica:flink-connector-mongodb-cdc:2.2.1 - flink-connector-mongodb-cdc (https://github.com/ververica/flink-cdc-connectors/tree/release-2.2.1/flink-connector-mongodb-cdc), (The Apache Software License, Version 2.0)
+ com.ververica:flink-connector-mongodb-cdc:2.3.0 - flink-connector-mongodb-cdc (https://github.com/ververica/flink-cdc-connectors/tree/release-2.2.1/flink-connector-mongodb-cdc), (The Apache Software License, Version 2.0)
com.ververica:flink-connector-mysql-cdc:2.2.1 - flink-connector-mysql-cdc (https://github.com/ververica/flink-cdc-connectors/tree/release-2.2.1/flink-connector-mysql-cdc), (The Apache Software License, Version 2.0)
com.ververica:flink-connector-oracle-cdc:2.3.0 - flink-connector-oracle-cdc (https://github.com/ververica/flink-cdc-connectors/tree/release-2.3.0/flink-connector-oracle-cdc/), (The Apache Software License, Version 2.0)
com.ververica:flink-connector-postgres-cdc:2.2.1 - flink-connector-postgres-cdc (https://github.com/ververica/flink-cdc-connectors/tree/release-2.2.1/flink-connector-postgres-cdc), (The Apache Software License, Version 2.0)
diff --git a/pom.xml b/pom.xml
index ce51e28ba..2be54be88 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,7 +161,7 @@
<flink.connector.sqlserver.cdc.version>2.2.1</flink.connector.sqlserver.cdc.version>
<flink.pulsar.version>1.13.6.2</flink.pulsar.version>
<flink.protobuf.version>2.7.6</flink.protobuf.version>
- <flink.connector.mongodb.cdc.version>2.2.1</flink.connector.mongodb.cdc.version>
+ <flink.connector.mongodb.cdc.version>2.3.0</flink.connector.mongodb.cdc.version>
<flink.connector.oracle.cdc.version>2.3.0</flink.connector.oracle.cdc.version>
<flink.connector.doris.version>1.0.3</flink.connector.doris.version>
<flink.connector.redis>1.1.0</flink.connector.redis>