You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "hailin0 (via GitHub)" <gi...@apache.org> on 2023/05/09 06:41:19 UTC

[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

hailin0 commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1188081012


##########
docs/en/connector-v2/sink/MongoDB.md:
##########
@@ -2,52 +2,223 @@
 
 > MongoDB sink connector
 
-## Description
+The MongoDB Connector provides the ability to read and write data from and to MongoDB.
+This document describes how to set up the MongoDB connector to run data writers against MongoDB.
 
-Write data to `MongoDB`
+Support those engines
+---------------------
 
-## Key features
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+Key featuresl
+-------------
 
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+Dependencies
+------------
+
+In order to use the Mongodb connector, the following dependencies are required.
+They can be downloaded via install-plugin.sh or from the Maven central repository.
+
+| MongoDB version |                                                  dependency                                                   |
+|-----------------|---------------------------------------------------------------------------------------------------------------|
+| universal       | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-mongodb) |
+
+Data Type Mapping
+-----------------
+
+The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.
+
+| Seatunnel type | MongoDB BSON type |
+|----------------|-------------------|
+| STRING         | ObjectId          |
+| STRING         | String            |
+| BOOLEAN        | Boolean           |
+| BINARY         | Binary            |
+| INTEGER        | Int32             |
+| TINYINT        | Int32             |
+| SMALLINT       | Int32             |
+| BIGINT         | Int64             |
+| DOUBLE         | Double            |
+| FLOAT          | Double            |
+| DECIMAL        | Decimal128        |
+| Date           | Date              |
+| Timestamp      | Timestamp[Date]   |
+| ROW            | Object            |
+| ARRAY          | Array             |
+
+Tips:
+1.When using SeaTunnel to write Date and Timestamp types to MongoDB, both will produce a Date data type in MongoDB, but the precision will be different. The data generated by the SeaTunnel Date type has second-level precision, while the data generated by the SeaTunnel Timestamp type has millisecond-level precision.
+
+2.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).
+
+Connector Options
+-----------------
+
+|        Option         | Required | Default |   Type   |                                                  Description                                                   |
+|-----------------------|----------|---------|----------|----------------------------------------------------------------------------------------------------------------|
+| uri                   | required | (none)  | String   | The MongoDB connection uri.                                                                                    |
+| database              | required | (none)  | String   | The name of MongoDB database to read or write.                                                                 |
+| collection            | required | (none)  | String   | The name of MongoDB collection to read or write.                                                               |
+| schema                | required | (none)  | String   | MongoDB's BSON and seatunnel data structure mapping                                                            |
+| buffer-flush.max-rows | optional | 1000    | String   | Specifies the maximum number of buffered rows per batch request.                                               |
+| buffer-flush.interval | optional | 30000   | String   | Specifies the retry time interval if writing records to database failed, the unit is seconds.                  |
+| retry.max             | optional | default | String   | Specifies the max retry times if writing records to database failed.                                           |
+| retry.interval        | optional | 1000    | Duration | Specifies the retry time interval if writing records to database failed, the unit is millisecond.              |
+| upsert-enable         | optional | false   | Boolean  | Whether to write documents via upsert mode.                                                                    |
+| upsert-key            | optional | (none)  | List     | The primary keys for upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for properties. |
+
+How to create a MongoDB Data synchronization jobs
+-------------------------------------------------
+
+The following example demonstrates how to create a data synchronization job that writes randomly generated data to a MongoDB database:
 
-## Options
+```bash
+# Set the basic configuration of the task to be performed
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval  = 1000
+}
 
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| uri            | string | yes      | -             |
-| database       | string | yes      | -             |
-| collection     | string | yes      | -             |
-| common-options | config | no       | -             |
+source {
+  FakeSource {
+      row.num = 2
+      bigint.min = 0
+      bigint.max = 10000000
+      split.num = 1
+      split.read-interval = 300
+      schema {
+        fields {
+          c_bigint = bigint
+        }
+      }
+    }
+}
 
-### uri [string]
+sink {
+  MongoDB{
+    uri = mongodb://user:password@127.0.0.1:27017
+    database = "test"
+    collection = "test"
+    schema = {
+      fields {
+        _id = string
+        c_bigint = bigint
+        }

Review Comment:
   ```suggestion
         }
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/config/MongodbWriterOptions.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.config;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Getter
+public class MongodbWriterOptions implements Serializable {

Review Comment:
   move to `org.apache.seatunnel.connectors.seatunnel.mongodb.sink`



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java:
##########
@@ -18,24 +18,37 @@
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(Factory.class)
 public class MongodbSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(URI, DATABASE, COLLECTION).build();
+        return OptionRule.builder()
+                .required(
+                        MongodbConfig.COLLECTION,
+                        MongodbConfig.DATABASE,
+                        MongodbConfig.COLLECTION,

Review Comment:
   duplicate config `COLLECTION` ?



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/writer/MongodbWriter.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.config.MongodbWriterOptions;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.WriteModel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED;
+
+@Slf4j
+public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {

Review Comment:
   move to `org.apache.seatunnel.connectors.seatunnel.mongodb.sink`



##########
docs/en/connector-v2/sink/MongoDB.md:
##########
@@ -2,52 +2,223 @@
 
 > MongoDB sink connector
 
-## Description
+The MongoDB Connector provides the ability to read and write data from and to MongoDB.
+This document describes how to set up the MongoDB connector to run data writers against MongoDB.
 
-Write data to `MongoDB`
+Support those engines
+---------------------
 
-## Key features
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+Key featuresl
+-------------
 
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+Dependencies
+------------
+
+In order to use the Mongodb connector, the following dependencies are required.
+They can be downloaded via install-plugin.sh or from the Maven central repository.
+
+| MongoDB version |                                                  dependency                                                   |
+|-----------------|---------------------------------------------------------------------------------------------------------------|
+| universal       | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-mongodb) |
+
+Data Type Mapping
+-----------------
+
+The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.
+
+| Seatunnel type | MongoDB BSON type |
+|----------------|-------------------|
+| STRING         | ObjectId          |
+| STRING         | String            |
+| BOOLEAN        | Boolean           |
+| BINARY         | Binary            |
+| INTEGER        | Int32             |
+| TINYINT        | Int32             |
+| SMALLINT       | Int32             |
+| BIGINT         | Int64             |
+| DOUBLE         | Double            |
+| FLOAT          | Double            |
+| DECIMAL        | Decimal128        |
+| Date           | Date              |
+| Timestamp      | Timestamp[Date]   |
+| ROW            | Object            |
+| ARRAY          | Array             |
+
+Tips:
+1.When using SeaTunnel to write Date and Timestamp types to MongoDB, both will produce a Date data type in MongoDB, but the precision will be different. The data generated by the SeaTunnel Date type has second-level precision, while the data generated by the SeaTunnel Timestamp type has millisecond-level precision.
+
+2.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).
+
+Connector Options
+-----------------
+
+|        Option         | Required | Default |   Type   |                                                  Description                                                   |
+|-----------------------|----------|---------|----------|----------------------------------------------------------------------------------------------------------------|
+| uri                   | required | (none)  | String   | The MongoDB connection uri.                                                                                    |
+| database              | required | (none)  | String   | The name of MongoDB database to read or write.                                                                 |
+| collection            | required | (none)  | String   | The name of MongoDB collection to read or write.                                                               |
+| schema                | required | (none)  | String   | MongoDB's BSON and seatunnel data structure mapping                                                            |
+| buffer-flush.max-rows | optional | 1000    | String   | Specifies the maximum number of buffered rows per batch request.                                               |
+| buffer-flush.interval | optional | 30000   | String   | Specifies the retry time interval if writing records to database failed, the unit is seconds.                  |
+| retry.max             | optional | default | String   | Specifies the max retry times if writing records to database failed.                                           |
+| retry.interval        | optional | 1000    | Duration | Specifies the retry time interval if writing records to database failed, the unit is millisecond.              |
+| upsert-enable         | optional | false   | Boolean  | Whether to write documents via upsert mode.                                                                    |
+| upsert-key            | optional | (none)  | List     | The primary keys for upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for properties. |
+
+How to create a MongoDB Data synchronization jobs
+-------------------------------------------------
+
+The following example demonstrates how to create a data synchronization job that writes randomly generated data to a MongoDB database:
 
-## Options
+```bash
+# Set the basic configuration of the task to be performed
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval  = 1000
+}
 
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| uri            | string | yes      | -             |
-| database       | string | yes      | -             |
-| collection     | string | yes      | -             |
-| common-options | config | no       | -             |
+source {
+  FakeSource {
+      row.num = 2
+      bigint.min = 0
+      bigint.max = 10000000
+      split.num = 1
+      split.read-interval = 300
+      schema {
+        fields {
+          c_bigint = bigint
+        }
+      }
+    }
+}
 
-### uri [string]
+sink {
+  MongoDB{
+    uri = mongodb://user:password@127.0.0.1:27017
+    database = "test"
+    collection = "test"
+    schema = {
+      fields {
+        _id = string
+        c_bigint = bigint
+        }
+    }
+  }
+}
+```
 
-uri to write to mongoDB
+Parameter interpretation
+------------------------
 
-### database [string]
+**MongoDB database connection URI examples**
 
-database to write to mongoDB
+Unauthenticated single node connection:
 
-### collection [string]
+```bash
+mongodb://127.0.0.0:27017/mydb
+```
 
-collection to write to mongoDB
+Replica set connection:
 
-### common options
+```bash
+mongodb://127.0.0.0:27017/mydb?replicaSet=xxx
+```
 
-Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
+Authenticated replica set connection:
 
-## Example
+```bash
+mongodb://admin:password@127.0.0.0:27017/mydb?replicaSet=xxx&authSource=admin
+```
+
+Multi-node replica set connection:
 
 ```bash
-mongodb {
-    uri = "mongodb://username:password@127.0.0.1:27017/mypost?retryWrites=true&writeConcern=majority"
-    database = "mydatabase"
-    collection = "mycollection"
+mongodb://127.0.0..1:27017,127.0.0..2:27017,127.0.0.3:27017/mydb?replicaSet=xxx
+```
+
+Sharded cluster connection:
+
+```bash
+mongodb://127.0.0.0:27017/mydb
+```
+
+Multiple mongos connections:
+
+```bash
+mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb
+```
+
+Note: The username and password in the URI must be URL-encoded before being concatenated into the connection string.
+
+**Buffer Flush**
+
+```bash
+sink {
+  MongoDB {
+    uri = "mongodb://user:password@127.0.0.1:27017"
+    database = "test_db"
+    collection = "users"
+    buffer-flush.max-rows = 2000
+    buffer-flush.interval = 1000
+    schema = {
+      fields {
+        _id = string
+        id = bigint
+        status = string
+      }
+    }
+  }
+}
+```
+
+**Why is it not recommended to use transactions for operation?**
+
+Although MongoDB has fully supported multi-document transactions since version 4.2, it doesn't mean that everyone should use them recklessly.
+Transactions are equivalent to locks, node coordination, additional overhead, and performance impact.
+Instead, the principle for using transactions should be: avoid using them if possible.
+The necessity for using transactions can be greatly avoided by designing systems rationally.
+
+**Idempotent Writes**
+
+By specifying a clear primary key and using the upsert method, exactly-once write semantics can be achieved.
+
+If upsert-key is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes.
+In the event of a failure, Seatunnel jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.
+
+```bash
+sink {
+  MongoDB {
+    uri = "mongodb://user:password@127.0.0.1:27017"
+    database = "test_db"
+    collection = "users"
+    upsert-enable = true
+    upsert-key = ["name","status"]
+    schema = {
+      fields {
+        _id = string
+        name = string
+        status = string
+      }
+    }
+  }
 }
 ```
 
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
 
-- Add MongoDB Sink Connector
+- Add MongoDB Source Connector
+
+### Next Version
+
+- [Feature]Refactor mongodb source connector([4380](https://github.com/apache/incubator-seatunnel/pull/4380))

Review Comment:
   ```suggestion
   - [Feature]Refactor mongodb source connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.source.reader;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCursor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
+
+/** MongoReader reads MongoDB by splits (queries). */
+@Slf4j
+public class MongodbReader implements SourceReader<SeaTunnelRow, MongoSplit> {
+
+    private final Queue<MongoSplit> pendingSplits;
+
+    private final DocumentDeserializer<SeaTunnelRow> deserializer;
+
+    private final SourceReader.Context context;
+
+    private final MongodbClientProvider clientProvider;
+
+    private MongoCursor<BsonDocument> cursor;
+
+    private MongoSplit currentSplit;
+
+    private final MongodbReadOptions readOptions;
+
+    private volatile boolean noMoreSplit;
+
+    public MongodbReader(
+            SourceReader.Context context,
+            MongodbClientProvider clientProvider,
+            DocumentDeserializer<SeaTunnelRow> deserializer,
+            MongodbReadOptions mongodbReadOptions) {
+        this.deserializer = deserializer;
+        this.context = context;
+        this.clientProvider = clientProvider;
+        pendingSplits = new ConcurrentLinkedDeque<>();
+        this.readOptions = mongodbReadOptions;
+    }
+
+    @Override
+    public void open() throws Exception {
+        if (cursor != null) {
+            cursor.close();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (cursor != null) {
+            cursor.close();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        synchronized (output.getCheckpointLock()) {
+            currentSplit = pendingSplits.poll();
+            if (null != currentSplit) {
+                if (cursor != null) {
+                    // current split is in-progress
+                    return;
+                }
+                log.info("Prepared to read split {}", currentSplit.splitId());
+                FindIterable<BsonDocument> rs =
+                        clientProvider
+                                .getDefaultCollection()
+                                .find(currentSplit.getQuery())
+                                .projection(currentSplit.getProjection())
+                                .batchSize(readOptions.getFetchSize())
+                                .noCursorTimeout(readOptions.isNoCursorTimeout())
+                                .maxTime(readOptions.getMaxTimeMS(), TimeUnit.MINUTES);
+                cursor = rs.iterator();
+                while (cursor.hasNext()) {
+                    SeaTunnelRow deserialize = deserializer.deserialize(cursor.next());
+                    output.collect(deserialize);
+                }
+                closeCurrentSplit();
+            }
+            if (noMoreSplit && pendingSplits.isEmpty()) {
+                // signal to the source that we have reached the end of the data.
+                log.info("Closed the bounded mongodb source");
+                context.signalNoMoreElement();
+            }
+        }
+    }
+
+    @Override
+    public List<MongoSplit> snapshotState(long checkpointId) throws Exception {
+        return new ArrayList<>(Collections.singleton(currentSplit));

Review Comment:
   should store `pendingSplits`?



##########
docs/en/connector-v2/source/MongoDB.md:
##########
@@ -91,5 +446,5 @@ mongodb {
 
 ### Next Version
 
-- common-options is not a required option
+- [Feature]Refactor mongodb source connector([4380](https://github.com/apache/incubator-seatunnel/pull/4380))

Review Comment:
   ```suggestion
   - [Feature]Refactor mongodb source connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java:
##########
@@ -22,31 +22,45 @@
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import java.util.ArrayList;
+
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(Factory.class)
 public class MongodbSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(URI, DATABASE, COLLECTION, CatalogTableUtil.SCHEMA)
-                .optional(MATCHQUERY)
+                .required(
+                        MongodbConfig.COLLECTION,
+                        MongodbConfig.DATABASE,
+                        MongodbConfig.COLLECTION,

Review Comment:
   duplicate config `COLLECTION` ?
   
   missing `url`?



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java:
##########
@@ -18,24 +18,37 @@
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(Factory.class)
 public class MongodbSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(URI, DATABASE, COLLECTION).build();
+        return OptionRule.builder()
+                .required(
+                        MongodbConfig.COLLECTION,
+                        MongodbConfig.DATABASE,
+                        MongodbConfig.COLLECTION,

Review Comment:
   missing `uri` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org