You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "MonsterChenzhuo (via GitHub)" <gi...@apache.org> on 2023/04/19 08:32:34 UTC

[GitHub] [incubator-seatunnel] MonsterChenzhuo opened a new pull request, #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb source connector

MonsterChenzhuo opened a new pull request, #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   close:#4280
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the following files are updated:
     1. Update change log that in connector document. For more details you can refer to [connector-v2](https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector-v2)
     2. Update [plugin-mapping.properties](https://github.com/apache/incubator-seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it
     3. Update the pom file of [seatunnel-dist](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml)
   * [ ] Update the [`release-note`](https://github.com/apache/incubator-seatunnel/blob/dev/release-note.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1193994015


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java:
##########
@@ -67,36 +58,145 @@
 @Slf4j
 public class MongodbIT extends TestSuiteBase implements TestResource {
 
-    private static final String MONGODB_IMAGE = "mongo:6.0.5";
+    private static final Random random = new Random();

Review Comment:
   ```suggestion
       private static final Random RANDOM = new Random();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1174526821


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf:
##########
@@ -19,55 +19,102 @@
 ######
 
 env {
-  # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
-  MongoDB {
-    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+  Mongodb {
+    uri = "mongodb://e2e_mongodb:27017/test_db"
     database = "test_db"
-    collection = "source_table"
+    collection = "sink_table"
+    no-timeout = true

Review Comment:
   Unify the parameter naming style



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf:
##########
@@ -19,55 +19,102 @@
 ######
 
 env {
-  # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
-  MongoDB {
-    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+  Mongodb {

Review Comment:
   why change this name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1174539525


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf:
##########
@@ -19,55 +19,102 @@
 ######
 
 env {
-  # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
-  MongoDB {
-    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+  Mongodb {

Review Comment:
   Source read configuration retryWrites, writeConcern, which is obviously unreasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1191840276


##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java:
##########
@@ -20,70 +20,100 @@
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
 
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(SeaTunnelSink.class)
 public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private SeaTunnelRowType rowType;
+    private MongodbWriterOptions options;
 
-    private MongodbConfig params;
+    private SeaTunnelRowType seaTunnelRowType;
 
     @Override
-    public String getPluginName() {
-        return "MongoDB";
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        if (pluginConfig.hasPath(MongodbConfig.URI.key())
+                && pluginConfig.hasPath(MongodbConfig.DATABASE.key())
+                && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
+            String connection = pluginConfig.getString(MongodbConfig.URI.key());
+            String database = pluginConfig.getString(MongodbConfig.DATABASE.key());
+            String collection = pluginConfig.getString(MongodbConfig.COLLECTION.key());
+            MongodbWriterOptions.Builder builder =
+                    MongodbWriterOptions.builder()
+                            .withConnectString(connection)
+                            .withDatabase(database)
+                            .withCollection(collection)
+                            .withFlushSize(
+                                    pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key())

Review Comment:
   use if-else instead



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java:
##########
@@ -20,70 +20,100 @@
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
 
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(SeaTunnelSink.class)
 public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private SeaTunnelRowType rowType;
+    private MongodbWriterOptions options;
 
-    private MongodbConfig params;
+    private SeaTunnelRowType seaTunnelRowType;
 
     @Override
-    public String getPluginName() {
-        return "MongoDB";
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        if (pluginConfig.hasPath(MongodbConfig.URI.key())
+                && pluginConfig.hasPath(MongodbConfig.DATABASE.key())
+                && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
+            String connection = pluginConfig.getString(MongodbConfig.URI.key());
+            String database = pluginConfig.getString(MongodbConfig.DATABASE.key());
+            String collection = pluginConfig.getString(MongodbConfig.COLLECTION.key());
+            MongodbWriterOptions.Builder builder =
+                    MongodbWriterOptions.builder()
+                            .withConnectString(connection)
+                            .withDatabase(database)
+                            .withCollection(collection)
+                            .withFlushSize(
+                                    pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key())
+                                            ? pluginConfig.getInt(
+                                                    MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key())
+                                            : MongodbConfig.BUFFER_FLUSH_MAX_ROWS.defaultValue())
+                            .withBatchIntervalMs(
+                                    pluginConfig.hasPath(MongodbConfig.BUFFER_FLUSH_INTERVAL.key())
+                                            ? pluginConfig.getLong(
+                                                    MongodbConfig.BUFFER_FLUSH_INTERVAL.key())
+                                            : MongodbConfig.BUFFER_FLUSH_INTERVAL.defaultValue())
+                            .withUpsertKey(
+                                    pluginConfig.hasPath(MongodbConfig.UPSERT_KEY.key())
+                                            ? pluginConfig
+                                                    .getStringList(MongodbConfig.UPSERT_KEY.key())
+                                                    .toArray(new String[0])
+                                            : new String[] {})
+                            .withUpsertEnable(
+                                    pluginConfig.hasPath(MongodbConfig.UPSERT_ENABLE.key())
+                                            ? pluginConfig.getBoolean(
+                                                    MongodbConfig.UPSERT_ENABLE.key())
+                                            : MongodbConfig.UPSERT_ENABLE.defaultValue())
+                            .withRetryMax(
+                                    pluginConfig.hasPath(MongodbConfig.RETRY_MAX.key())
+                                            ? pluginConfig.getInt(MongodbConfig.RETRY_MAX.key())
+                                            : MongodbConfig.RETRY_MAX.defaultValue())
+                            .withRetryInterval(
+                                    pluginConfig.hasPath(MongodbConfig.RETRY_INTERVAL.key())
+                                            ? pluginConfig.getLong(
+                                                    MongodbConfig.RETRY_INTERVAL.key())
+                                            : MongodbConfig.RETRY_INTERVAL.defaultValue());
+            this.options = builder.build();
+        }
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(config, URI.key(), DATABASE.key(), COLLECTION.key());
-        if (!result.isSuccess()) {
-            throw new MongodbConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, result.getMsg()));
-        }
-
-        this.params = MongodbConfig.buildWithConfig(config);
+    public String getPluginName() {
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType rowType) {
-        this.rowType = rowType;
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
     }
 
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return rowType;
+        return seaTunnelRowType;
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
             throws IOException {
-        boolean useSimpleTextSchema = CatalogTableUtil.buildSimpleTextSchema().equals(rowType);
-        return new MongodbSinkWriter(rowType, useSimpleTextSchema, params);
+        return new MongodbWriter(
+                new RowDataDocumentSerializer(

Review Comment:
   add context into writer, it will be used to dirty records collect



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java:
##########
@@ -20,57 +20,125 @@
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
-import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentRowDataDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.enumerator.MongodbSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.reader.MongodbReader;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplitStrategy;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.SamplingSplitStrategy;
+
+import org.bson.BsonDocument;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import java.util.ArrayList;
+
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(SeaTunnelSource.class)
-public class MongodbSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+public class MongodbSource
+        implements SeaTunnelSource<SeaTunnelRow, MongoSplit, ArrayList<MongoSplit>>,
+                SupportColumnProjection {
+
+    private static final long serialVersionUID = 1L;
+
+    private MongodbClientProvider clientProvider;
+
+    private DocumentDeserializer<SeaTunnelRow> deserializer;
+
+    private MongoSplitStrategy splitStrategy;
 
     private SeaTunnelRowType rowType;
 
-    private MongodbConfig params;
+    private MongodbReadOptions mongodbReadOptions;
 
     @Override
     public String getPluginName() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(config, URI.key(), DATABASE.key(), COLLECTION.key());
-        if (!result.isSuccess()) {
-            throw new MongodbConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, result.getMsg()));
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        if (pluginConfig.hasPath(MongodbConfig.URI.key())
+                && pluginConfig.hasPath(MongodbConfig.DATABASE.key())
+                && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
+            String connection = pluginConfig.getString(MongodbConfig.URI.key());
+            String database = pluginConfig.getString(MongodbConfig.DATABASE.key());
+            String collection = pluginConfig.getString(MongodbConfig.COLLECTION.key());
+            clientProvider =
+                    MongodbCollectionProvider.getBuilder()
+                            .connectionString(connection)
+                            .database(database)
+                            .collection(collection)
+                            .build();
         }
-        this.params = MongodbConfig.buildWithConfig(config);
-        if (config.hasPath(CatalogTableUtil.SCHEMA.key())) {
-            this.rowType = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+        if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
+            this.rowType = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
         } else {
             this.rowType = CatalogTableUtil.buildSimpleTextSchema();
         }
+        deserializer =
+                new DocumentRowDataDeserializer(
+                        rowType.getFieldNames(),
+                        rowType,
+                        pluginConfig.hasPath(MongodbConfig.FLAT_SYNC_STRING.key())
+                                ? pluginConfig.getBoolean(MongodbConfig.FLAT_SYNC_STRING.key())

Review Comment:
   The same as above



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/matchIT/mongodb_matchQuery_source_to_assert.conf:
##########
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config

Review Comment:
   the same as above



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongoKeyExtractor.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.SerializableFunction;
+
+import org.bson.BsonDocument;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class MongoKeyExtractor implements SerializableFunction<BsonDocument, BsonDocument> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static String[] upsertKey;

Review Comment:
   why static



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/matchIT/mongodb_matchProjection_source_to_assert.conf:
##########
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config

Review Comment:
   the same as above



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.serde;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.json.JsonParseException;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.seatunnel.api.table.type.SqlType.NULL;
+import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.ENCODE_VALUE_FIELD;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.serde.BsonToRowDataConverters.fromBigDecimal;
+
+public class RowDataToBsonConverters implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FunctionalInterface
+    public interface RowDataToBsonConverter extends Serializable {
+        BsonDocument convert(SeaTunnelRow rowData);
+    }
+
+    public static RowDataToBsonConverter createConverter(SeaTunnelDataType<?> type) {
+        SerializableFunction<Object, BsonValue> internalRowConverter =
+                createNullSafeInternalConverter(type);
+        return new RowDataToBsonConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public BsonDocument convert(SeaTunnelRow rowData) {
+                return (BsonDocument) internalRowConverter.apply(rowData);
+            }
+        };
+    }
+
+    private static SerializableFunction<Object, BsonValue> createNullSafeInternalConverter(
+            SeaTunnelDataType<?> type) {
+        return wrapIntoNullSafeInternalConverter(createInternalConverter(type), type);
+    }
+
+    private static SerializableFunction<Object, BsonValue> wrapIntoNullSafeInternalConverter(
+            SerializableFunction<Object, BsonValue> internalConverter, SeaTunnelDataType<?> type) {
+        return new SerializableFunction<Object, BsonValue>() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public BsonValue apply(Object value) {
+                if (value == null || NULL.equals(type.getSqlType())) {
+                    throw new MongodbConnectorException(
+                            UNSUPPORTED_DATA_TYPE,
+                            "The column type is <"
+                                    + type
+                                    + ">, but a null value is being written into it");
+                } else {
+                    return internalConverter.apply(value);
+                }
+            }
+        };
+    }
+
+    private static SerializableFunction<Object, BsonValue> createInternalConverter(
+            SeaTunnelDataType<?> type) {
+        switch (type.getSqlType()) {
+            case NULL:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        return BsonNull.VALUE;
+                    }
+                };
+            case BOOLEAN:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        return new BsonBoolean((boolean) value);
+                    }
+                };
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        int intValue =
+                                value instanceof Byte
+                                        ? ((Byte) value) & 0xFF
+                                        : value instanceof Short
+                                                ? ((Short) value).intValue()
+                                                : (int) value;
+                        return new BsonInt32(intValue);
+                    }
+                };
+            case BIGINT:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        return new BsonInt64((long) value);
+                    }
+                };
+            case FLOAT:
+            case DOUBLE:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        double v =
+                                value instanceof Float
+                                        ? ((Float) value).doubleValue()
+                                        : (double) value;
+                        return new BsonDouble(v);
+                    }
+                };
+            case STRING:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        String val = value.toString();
+                        // try to parse out the mongodb specific data type from extend-json.
+                        if (val.startsWith("{")
+                                && val.endsWith("}")
+                                && val.contains(ENCODE_VALUE_FIELD)) {
+                            try {
+                                BsonDocument doc = BsonDocument.parse(val);
+                                if (doc.containsKey(ENCODE_VALUE_FIELD)) {
+                                    return doc.get(ENCODE_VALUE_FIELD);
+                                }
+                            } catch (JsonParseException e) {
+                                // invalid json format, fallback to store as a bson string.
+                                return new BsonString(value.toString());
+                            }
+                        }
+                        return new BsonString(value.toString());
+                    }
+                };
+            case BYTES:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        return new BsonBinary((byte[]) value);
+                    }
+                };
+            case DATE:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        LocalDate localDate = (LocalDate) value;
+                        return new BsonDateTime(
+                                localDate
+                                        .atStartOfDay(ZoneId.systemDefault())
+                                        .toInstant()
+                                        .toEpochMilli());
+                    }
+                };
+            case TIMESTAMP:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        LocalDateTime localDateTime = (LocalDateTime) value;
+                        return new BsonDateTime(
+                                localDateTime
+                                        .atZone(ZoneId.systemDefault())
+                                        .toInstant()
+                                        .toEpochMilli());
+                    }
+                };
+            case DECIMAL:
+                return new SerializableFunction<Object, BsonValue>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public BsonValue apply(Object value) {
+                        DecimalType decimalType = (DecimalType) type;
+                        BigDecimal decimalVal = (BigDecimal) value;
+                        return new BsonDecimal128(
+                                new Decimal128(
+                                        Objects.requireNonNull(
+                                                fromBigDecimal(
+                                                        decimalVal,
+                                                        decimalType.getPrecision(),
+                                                        decimalType.getScale()))));
+                    }
+                };
+            case ARRAY:
+                return createArrayConverter((ArrayType<?, ?>) type);
+            case MAP:
+                MapType<?, ?> mapType = (MapType<?, ?>) type;
+                return createMapConverter(
+                        mapType.toString(), mapType.getKeyType(), mapType.getValueType());
+            case ROW:
+                return createRowConverter((SeaTunnelRowType) type);
+            default:
+                throw new UnsupportedOperationException("Not support to parse type: " + type);

Review Comment:
   MongoDBConnectorException



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/update_mongodb_to_assert.conf:
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######

Review Comment:
   the same as above



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_update_mongodb.conf:
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config

Review Comment:
   the same as above



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_updateMode_insert_mongodb.conf:
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config

Review Comment:
   the same as above



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/flatIT/fake_source_to_flat_mongodb.conf:
##########
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config

Review Comment:
   remove this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1541445545

   @hailin0 @EricJoy2048  PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1537034737

   <img width="449" alt="图片" src="https://user-images.githubusercontent.com/60029759/236597659-7a646356-25d8-4781-b732-1edf78c9a825.png">
   <img width="549" alt="图片" src="https://user-images.githubusercontent.com/60029759/236597713-9387b341-2e1d-4a15-82b6-95fc59fe51dc.png">
   Test writing 1000w pieces of data


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1188081012


##########
docs/en/connector-v2/sink/MongoDB.md:
##########
@@ -2,52 +2,223 @@
 
 > MongoDB sink connector
 
-## Description
+The MongoDB Connector provides the ability to read and write data from and to MongoDB.
+This document describes how to set up the MongoDB connector to run data writers against MongoDB.
 
-Write data to `MongoDB`
+Support those engines
+---------------------
 
-## Key features
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+Key featuresl
+-------------
 
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+Dependencies
+------------
+
+In order to use the Mongodb connector, the following dependencies are required.
+They can be downloaded via install-plugin.sh or from the Maven central repository.
+
+| MongoDB version |                                                  dependency                                                   |
+|-----------------|---------------------------------------------------------------------------------------------------------------|
+| universal       | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-mongodb) |
+
+Data Type Mapping
+-----------------
+
+The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.
+
+| Seatunnel type | MongoDB BSON type |
+|----------------|-------------------|
+| STRING         | ObjectId          |
+| STRING         | String            |
+| BOOLEAN        | Boolean           |
+| BINARY         | Binary            |
+| INTEGER        | Int32             |
+| TINYINT        | Int32             |
+| SMALLINT       | Int32             |
+| BIGINT         | Int64             |
+| DOUBLE         | Double            |
+| FLOAT          | Double            |
+| DECIMAL        | Decimal128        |
+| Date           | Date              |
+| Timestamp      | Timestamp[Date]   |
+| ROW            | Object            |
+| ARRAY          | Array             |
+
+Tips:
+1.When using SeaTunnel to write Date and Timestamp types to MongoDB, both will produce a Date data type in MongoDB, but the precision will be different. The data generated by the SeaTunnel Date type has second-level precision, while the data generated by the SeaTunnel Timestamp type has millisecond-level precision.
+
+2.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).
+
+Connector Options
+-----------------
+
+|        Option         | Required | Default |   Type   |                                                  Description                                                   |
+|-----------------------|----------|---------|----------|----------------------------------------------------------------------------------------------------------------|
+| uri                   | required | (none)  | String   | The MongoDB connection uri.                                                                                    |
+| database              | required | (none)  | String   | The name of MongoDB database to read or write.                                                                 |
+| collection            | required | (none)  | String   | The name of MongoDB collection to read or write.                                                               |
+| schema                | required | (none)  | String   | MongoDB's BSON and seatunnel data structure mapping                                                            |
+| buffer-flush.max-rows | optional | 1000    | String   | Specifies the maximum number of buffered rows per batch request.                                               |
+| buffer-flush.interval | optional | 30000   | String   | Specifies the retry time interval if writing records to database failed, the unit is seconds.                  |
+| retry.max             | optional | default | String   | Specifies the max retry times if writing records to database failed.                                           |
+| retry.interval        | optional | 1000    | Duration | Specifies the retry time interval if writing records to database failed, the unit is millisecond.              |
+| upsert-enable         | optional | false   | Boolean  | Whether to write documents via upsert mode.                                                                    |
+| upsert-key            | optional | (none)  | List     | The primary keys for upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for properties. |
+
+How to create a MongoDB Data synchronization jobs
+-------------------------------------------------
+
+The following example demonstrates how to create a data synchronization job that writes randomly generated data to a MongoDB database:
 
-## Options
+```bash
+# Set the basic configuration of the task to be performed
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval  = 1000
+}
 
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| uri            | string | yes      | -             |
-| database       | string | yes      | -             |
-| collection     | string | yes      | -             |
-| common-options | config | no       | -             |
+source {
+  FakeSource {
+      row.num = 2
+      bigint.min = 0
+      bigint.max = 10000000
+      split.num = 1
+      split.read-interval = 300
+      schema {
+        fields {
+          c_bigint = bigint
+        }
+      }
+    }
+}
 
-### uri [string]
+sink {
+  MongoDB{
+    uri = mongodb://user:password@127.0.0.1:27017
+    database = "test"
+    collection = "test"
+    schema = {
+      fields {
+        _id = string
+        c_bigint = bigint
+        }

Review Comment:
   ```suggestion
         }
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/config/MongodbWriterOptions.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.config;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Getter
+public class MongodbWriterOptions implements Serializable {

Review Comment:
   move to `org.apache.seatunnel.connectors.seatunnel.mongodb.sink`



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java:
##########
@@ -18,24 +18,37 @@
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(Factory.class)
 public class MongodbSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(URI, DATABASE, COLLECTION).build();
+        return OptionRule.builder()
+                .required(
+                        MongodbConfig.COLLECTION,
+                        MongodbConfig.DATABASE,
+                        MongodbConfig.COLLECTION,

Review Comment:
   duplicate config `COLLECTION` ?



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/writer/MongodbWriter.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.config.MongodbWriterOptions;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.WriteModel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED;
+
+@Slf4j
+public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {

Review Comment:
   move to `org.apache.seatunnel.connectors.seatunnel.mongodb.sink`



##########
docs/en/connector-v2/sink/MongoDB.md:
##########
@@ -2,52 +2,223 @@
 
 > MongoDB sink connector
 
-## Description
+The MongoDB Connector provides the ability to read and write data from and to MongoDB.
+This document describes how to set up the MongoDB connector to run data writers against MongoDB.
 
-Write data to `MongoDB`
+Support those engines
+---------------------
 
-## Key features
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+Key featuresl
+-------------
 
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+Dependencies
+------------
+
+In order to use the Mongodb connector, the following dependencies are required.
+They can be downloaded via install-plugin.sh or from the Maven central repository.
+
+| MongoDB version |                                                  dependency                                                   |
+|-----------------|---------------------------------------------------------------------------------------------------------------|
+| universal       | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-mongodb) |
+
+Data Type Mapping
+-----------------
+
+The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.
+
+| Seatunnel type | MongoDB BSON type |
+|----------------|-------------------|
+| STRING         | ObjectId          |
+| STRING         | String            |
+| BOOLEAN        | Boolean           |
+| BINARY         | Binary            |
+| INTEGER        | Int32             |
+| TINYINT        | Int32             |
+| SMALLINT       | Int32             |
+| BIGINT         | Int64             |
+| DOUBLE         | Double            |
+| FLOAT          | Double            |
+| DECIMAL        | Decimal128        |
+| Date           | Date              |
+| Timestamp      | Timestamp[Date]   |
+| ROW            | Object            |
+| ARRAY          | Array             |
+
+Tips:
+1.When using SeaTunnel to write Date and Timestamp types to MongoDB, both will produce a Date data type in MongoDB, but the precision will be different. The data generated by the SeaTunnel Date type has second-level precision, while the data generated by the SeaTunnel Timestamp type has millisecond-level precision.
+
+2.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).
+
+Connector Options
+-----------------
+
+|        Option         | Required | Default |   Type   |                                                  Description                                                   |
+|-----------------------|----------|---------|----------|----------------------------------------------------------------------------------------------------------------|
+| uri                   | required | (none)  | String   | The MongoDB connection uri.                                                                                    |
+| database              | required | (none)  | String   | The name of MongoDB database to read or write.                                                                 |
+| collection            | required | (none)  | String   | The name of MongoDB collection to read or write.                                                               |
+| schema                | required | (none)  | String   | MongoDB's BSON and seatunnel data structure mapping                                                            |
+| buffer-flush.max-rows | optional | 1000    | String   | Specifies the maximum number of buffered rows per batch request.                                               |
+| buffer-flush.interval | optional | 30000   | String   | Specifies the retry time interval if writing records to database failed, the unit is seconds.                  |
+| retry.max             | optional | default | String   | Specifies the max retry times if writing records to database failed.                                           |
+| retry.interval        | optional | 1000    | Duration | Specifies the retry time interval if writing records to database failed, the unit is millisecond.              |
+| upsert-enable         | optional | false   | Boolean  | Whether to write documents via upsert mode.                                                                    |
+| upsert-key            | optional | (none)  | List     | The primary keys for upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for properties. |
+
+How to create a MongoDB Data synchronization jobs
+-------------------------------------------------
+
+The following example demonstrates how to create a data synchronization job that writes randomly generated data to a MongoDB database:
 
-## Options
+```bash
+# Set the basic configuration of the task to be performed
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval  = 1000
+}
 
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| uri            | string | yes      | -             |
-| database       | string | yes      | -             |
-| collection     | string | yes      | -             |
-| common-options | config | no       | -             |
+source {
+  FakeSource {
+      row.num = 2
+      bigint.min = 0
+      bigint.max = 10000000
+      split.num = 1
+      split.read-interval = 300
+      schema {
+        fields {
+          c_bigint = bigint
+        }
+      }
+    }
+}
 
-### uri [string]
+sink {
+  MongoDB{
+    uri = mongodb://user:password@127.0.0.1:27017
+    database = "test"
+    collection = "test"
+    schema = {
+      fields {
+        _id = string
+        c_bigint = bigint
+        }
+    }
+  }
+}
+```
 
-uri to write to mongoDB
+Parameter interpretation
+------------------------
 
-### database [string]
+**MongoDB database connection URI examples**
 
-database to write to mongoDB
+Unauthenticated single node connection:
 
-### collection [string]
+```bash
+mongodb://127.0.0.0:27017/mydb
+```
 
-collection to write to mongoDB
+Replica set connection:
 
-### common options
+```bash
+mongodb://127.0.0.0:27017/mydb?replicaSet=xxx
+```
 
-Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
+Authenticated replica set connection:
 
-## Example
+```bash
+mongodb://admin:password@127.0.0.0:27017/mydb?replicaSet=xxx&authSource=admin
+```
+
+Multi-node replica set connection:
 
 ```bash
-mongodb {
-    uri = "mongodb://username:password@127.0.0.1:27017/mypost?retryWrites=true&writeConcern=majority"
-    database = "mydatabase"
-    collection = "mycollection"
+mongodb://127.0.0..1:27017,127.0.0..2:27017,127.0.0.3:27017/mydb?replicaSet=xxx
+```
+
+Sharded cluster connection:
+
+```bash
+mongodb://127.0.0.0:27017/mydb
+```
+
+Multiple mongos connections:
+
+```bash
+mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb
+```
+
+Note: The username and password in the URI must be URL-encoded before being concatenated into the connection string.
+
+**Buffer Flush**
+
+```bash
+sink {
+  MongoDB {
+    uri = "mongodb://user:password@127.0.0.1:27017"
+    database = "test_db"
+    collection = "users"
+    buffer-flush.max-rows = 2000
+    buffer-flush.interval = 1000
+    schema = {
+      fields {
+        _id = string
+        id = bigint
+        status = string
+      }
+    }
+  }
+}
+```
+
+**Why is it not recommended to use transactions for operation?**
+
+Although MongoDB has fully supported multi-document transactions since version 4.2, it doesn't mean that everyone should use them recklessly.
+Transactions are equivalent to locks, node coordination, additional overhead, and performance impact.
+Instead, the principle for using transactions should be: avoid using them if possible.
+The necessity for using transactions can be greatly avoided by designing systems rationally.
+
+**Idempotent Writes**
+
+By specifying a clear primary key and using the upsert method, exactly-once write semantics can be achieved.
+
+If upsert-key is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes.
+In the event of a failure, Seatunnel jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.
+
+```bash
+sink {
+  MongoDB {
+    uri = "mongodb://user:password@127.0.0.1:27017"
+    database = "test_db"
+    collection = "users"
+    upsert-enable = true
+    upsert-key = ["name","status"]
+    schema = {
+      fields {
+        _id = string
+        name = string
+        status = string
+      }
+    }
+  }
 }
 ```
 
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
 
-- Add MongoDB Sink Connector
+- Add MongoDB Source Connector
+
+### Next Version
+
+- [Feature]Refactor mongodb source connector([4380](https://github.com/apache/incubator-seatunnel/pull/4380))

Review Comment:
   ```suggestion
   - [Feature]Refactor mongodb source connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.source.reader;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentDeserializer;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCursor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
+
+/** MongoReader reads MongoDB by splits (queries). */
+@Slf4j
+public class MongodbReader implements SourceReader<SeaTunnelRow, MongoSplit> {
+
+    private final Queue<MongoSplit> pendingSplits;
+
+    private final DocumentDeserializer<SeaTunnelRow> deserializer;
+
+    private final SourceReader.Context context;
+
+    private final MongodbClientProvider clientProvider;
+
+    private MongoCursor<BsonDocument> cursor;
+
+    private MongoSplit currentSplit;
+
+    private final MongodbReadOptions readOptions;
+
+    private volatile boolean noMoreSplit;
+
+    public MongodbReader(
+            SourceReader.Context context,
+            MongodbClientProvider clientProvider,
+            DocumentDeserializer<SeaTunnelRow> deserializer,
+            MongodbReadOptions mongodbReadOptions) {
+        this.deserializer = deserializer;
+        this.context = context;
+        this.clientProvider = clientProvider;
+        pendingSplits = new ConcurrentLinkedDeque<>();
+        this.readOptions = mongodbReadOptions;
+    }
+
+    @Override
+    public void open() throws Exception {
+        if (cursor != null) {
+            cursor.close();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (cursor != null) {
+            cursor.close();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        synchronized (output.getCheckpointLock()) {
+            currentSplit = pendingSplits.poll();
+            if (null != currentSplit) {
+                if (cursor != null) {
+                    // current split is in-progress
+                    return;
+                }
+                log.info("Prepared to read split {}", currentSplit.splitId());
+                FindIterable<BsonDocument> rs =
+                        clientProvider
+                                .getDefaultCollection()
+                                .find(currentSplit.getQuery())
+                                .projection(currentSplit.getProjection())
+                                .batchSize(readOptions.getFetchSize())
+                                .noCursorTimeout(readOptions.isNoCursorTimeout())
+                                .maxTime(readOptions.getMaxTimeMS(), TimeUnit.MINUTES);
+                cursor = rs.iterator();
+                while (cursor.hasNext()) {
+                    SeaTunnelRow deserialize = deserializer.deserialize(cursor.next());
+                    output.collect(deserialize);
+                }
+                closeCurrentSplit();
+            }
+            if (noMoreSplit && pendingSplits.isEmpty()) {
+                // signal to the source that we have reached the end of the data.
+                log.info("Closed the bounded mongodb source");
+                context.signalNoMoreElement();
+            }
+        }
+    }
+
+    @Override
+    public List<MongoSplit> snapshotState(long checkpointId) throws Exception {
+        return new ArrayList<>(Collections.singleton(currentSplit));

Review Comment:
   should store `pendingSplits`?



##########
docs/en/connector-v2/source/MongoDB.md:
##########
@@ -91,5 +446,5 @@ mongodb {
 
 ### Next Version
 
-- common-options is not a required option
+- [Feature]Refactor mongodb source connector([4380](https://github.com/apache/incubator-seatunnel/pull/4380))

Review Comment:
   ```suggestion
   - [Feature]Refactor mongodb source connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java:
##########
@@ -22,31 +22,45 @@
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.MATCHQUERY;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import java.util.ArrayList;
+
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(Factory.class)
 public class MongodbSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(URI, DATABASE, COLLECTION, CatalogTableUtil.SCHEMA)
-                .optional(MATCHQUERY)
+                .required(
+                        MongodbConfig.COLLECTION,
+                        MongodbConfig.DATABASE,
+                        MongodbConfig.COLLECTION,

Review Comment:
   duplicate config `COLLECTION` ?
   
   missing `url`?



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java:
##########
@@ -18,24 +18,37 @@
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.COLLECTION;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbOption.URI;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(Factory.class)
 public class MongodbSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "MongoDB";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(URI, DATABASE, COLLECTION).build();
+        return OptionRule.builder()
+                .required(
+                        MongodbConfig.COLLECTION,
+                        MongodbConfig.DATABASE,
+                        MongodbConfig.COLLECTION,

Review Comment:
   missing `uri` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1188868373


##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.WriteModel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED;
+
+@Slf4j
+public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private MongodbClientProvider collectionProvider;
+
+    private final DocumentSerializer<SeaTunnelRow> serializer;
+
+    private long bulkActions;
+
+    private final List<WriteModel<BsonDocument>> bulkRequests;
+
+    private int maxRetries;
+
+    private long retryIntervalMs;
+
+    private long batchIntervalMs;
+
+    private volatile long lastSendTime = 0L;
+
+    public MongodbWriter(
+            DocumentSerializer<SeaTunnelRow> serializer, MongodbWriterOptions options) {
+        initOptions(options);
+        this.serializer = serializer;
+        this.bulkRequests = new ArrayList<>();
+    }
+
+    private void initOptions(MongodbWriterOptions options) {
+        this.maxRetries = options.getRetryMax();
+        this.retryIntervalMs = options.getRetryInterval();
+        this.collectionProvider =
+                MongodbCollectionProvider.getBuilder()
+                        .connectionString(options.getConnectString())
+                        .database(options.getDatabase())
+                        .collection(options.getCollection())
+                        .build();
+        this.bulkActions = options.getFlushSize();
+        this.batchIntervalMs = options.getBatchIntervalMs();
+    }
+
+    @Override
+    public void write(SeaTunnelRow o) throws IOException {
+        bulkRequests.add(serializer.serializeToWriteModel(o));
+        if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
+            doBulkWrite();
+        }
+    }
+

Review Comment:
   <img width="1236" alt="图片" src="https://github.com/apache/incubator-seatunnel/assets/60029759/8dbd46fc-d57c-44e2-8a12-55f0e1704611">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1174539525


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf:
##########
@@ -19,55 +19,102 @@
 ######
 
 env {
-  # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
-  MongoDB {
-    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+  Mongodb {

Review Comment:
   Source read configuration retryWrites, writeConcern, which is obviously unreasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1541293765

   @hailin0 PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1537101628

   @TyrantLucifer @hailin0 @EricJoy2048 PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1172049883


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java:
##########
@@ -71,13 +62,23 @@ public class MongodbIT extends TestSuiteBase implements TestResource {
     private static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
     private static final int MONGODB_PORT = 27017;
     private static final String MONGODB_DATABASE = "test_db";
-    private static final String MONGODB_SOURCE_TABLE = "source_table";
 
-    private static final List<Document> TEST_DATASET = generateTestDataSet(0, 10);
+    private static final String MONGODB_SOURCE_TABLE = "test_match_op_db";
 
     private GenericContainer<?> mongodbContainer;
     private MongoClient client;
 
+    private static final Random random = new Random();

Review Comment:
   static parameter should on the top.



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_matchProjection_source_to_assert.conf:
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Mongodb {
+    connection = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
+    database = "test_db"
+    collection = "sink_table"
+    match.projection = "{ c_bigint:0 }"
+    result_table_name = "mongodb_table"
+    no-timeout = true
+    fetch.size = 1000
+    max.time.min = 100
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = int
+        c_smallint = int
+        c_int = int
+        c_float = double
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(33, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = int
+          c_smallint = int
+          c_int = int
+          c_bigint = bigint
+          c_float = double
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(33, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+
+sink {
+  Console {
+        source_table_name = "mongodb_table"
+      }
+  Assert {
+  source_table_name = "mongodb_table"
+    rules =
+      {
+        row_rules = [
+          {

Review Comment:
   Add more check rules.



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf:
##########
@@ -19,55 +19,81 @@
 ######
 
 env {
-  # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
-  MongoDB {
-    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+  Mongodb {
+    connection = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
     database = "test_db"
-    collection = "source_table"
+    collection = "sink_table"
+    no-timeout = true
+    fetch.size = 1000
+    max.time.min = 100
+    result_table_name = "mongodb_table"
     schema = {
       fields {
-        id = bigint
-        c_map = "map<string, smallint>"
-        c_array = "array<tinyint>"
+        c_map = "map<string, string>"
+        c_array = "array<int>"
         c_string = string
         c_boolean = boolean
-        c_tinyint = tinyint
-        c_smallint = smallint
+        c_tinyint = int

Review Comment:
   Why change it? Connector should support all type that SeaTunnel has defined, although mongo does not distinguish, you should and convert logic in your connector.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1547768950

   @TyrantLucifer  PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1536562348

   <img width="461" alt="图片" src="https://user-images.githubusercontent.com/60029759/236525395-a59c31c4-9074-42c1-8a9b-f85085e56490.png">
   <img width="434" alt="图片" src="https://user-images.githubusercontent.com/60029759/236525415-6be68142-34e7-4134-9ec7-b28892f989b6.png">
   I tested 400w mongodb table reading today.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1188789130


##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.WriteModel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED;
+
+@Slf4j
+public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private MongodbClientProvider collectionProvider;
+
+    private final DocumentSerializer<SeaTunnelRow> serializer;
+
+    private long bulkActions;
+
+    private final List<WriteModel<BsonDocument>> bulkRequests;
+
+    private int maxRetries;
+
+    private long retryIntervalMs;
+
+    private long batchIntervalMs;
+
+    private volatile long lastSendTime = 0L;
+
+    public MongodbWriter(
+            DocumentSerializer<SeaTunnelRow> serializer, MongodbWriterOptions options) {
+        initOptions(options);
+        this.serializer = serializer;
+        this.bulkRequests = new ArrayList<>();
+    }
+
+    private void initOptions(MongodbWriterOptions options) {
+        this.maxRetries = options.getRetryMax();
+        this.retryIntervalMs = options.getRetryInterval();
+        this.collectionProvider =
+                MongodbCollectionProvider.getBuilder()
+                        .connectionString(options.getConnectString())
+                        .database(options.getDatabase())
+                        .collection(options.getCollection())
+                        .build();
+        this.bulkActions = options.getFlushSize();
+        this.batchIntervalMs = options.getBatchIntervalMs();
+    }
+
+    @Override
+    public void write(SeaTunnelRow o) throws IOException {
+        bulkRequests.add(serializer.serializeToWriteModel(o));
+        if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
+            doBulkWrite();
+        }
+    }
+

Review Comment:
   ```suggestion
       @Override
       public Optional<Void> prepareCommit() {
           doBulkWrite();
           return Optional.empty();
       }
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.WriteModel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED;
+
+@Slf4j
+public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private MongodbClientProvider collectionProvider;
+
+    private final DocumentSerializer<SeaTunnelRow> serializer;
+
+    private long bulkActions;
+
+    private final List<WriteModel<BsonDocument>> bulkRequests;
+
+    private int maxRetries;
+
+    private long retryIntervalMs;
+
+    private long batchIntervalMs;
+
+    private volatile long lastSendTime = 0L;
+
+    public MongodbWriter(
+            DocumentSerializer<SeaTunnelRow> serializer, MongodbWriterOptions options) {
+        initOptions(options);
+        this.serializer = serializer;
+        this.bulkRequests = new ArrayList<>();
+    }
+
+    private void initOptions(MongodbWriterOptions options) {
+        this.maxRetries = options.getRetryMax();
+        this.retryIntervalMs = options.getRetryInterval();
+        this.collectionProvider =
+                MongodbCollectionProvider.getBuilder()
+                        .connectionString(options.getConnectString())
+                        .database(options.getDatabase())
+                        .collection(options.getCollection())
+                        .build();
+        this.bulkActions = options.getFlushSize();
+        this.batchIntervalMs = options.getBatchIntervalMs();
+    }
+
+    @Override
+    public void write(SeaTunnelRow o) throws IOException {
+        bulkRequests.add(serializer.serializeToWriteModel(o));
+        if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
+            doBulkWrite();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        doBulkWrite();
+        if (collectionProvider != null) {
+            collectionProvider.close();
+        }
+    }
+
+    void doBulkWrite() throws IOException {

Review Comment:
   ```suggestion
       synchronized void doBulkWrite() throws IOException {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 merged pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 merged PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1540168948

   @hailin0 PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1514350026

   I will commit the changes to the release-note.md file after the PR review is completed, in order to prevent conflicts that may cause the CI to not run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1514361855

   cc @TyrantLucifer @hailin0 PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1545013515

   IMO,  we will implement ddl change in seatunnel, so the write logic of all connectors should not use async write, write  action should bundle with checkpoint and preCommit stage cc @hailin0 @EricJoy2048 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1171185414


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java:
##########
@@ -48,54 +38,41 @@
 
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoCollection;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.time.Duration;
-import java.time.LocalDate;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static java.net.HttpURLConnection.HTTP_OK;
 import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
 
 @Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.FLINK, EngineType.SPARK})

Review Comment:
   @TyrantLucifer I apologize for the inconvenience. Due to the fact that the Flink 1.13 image cannot run on the Mac M1, I have encountered issues with running Flink CI locally, even though I have successfully validated it with Zeta, Flink, and Spark. When I submit the code directly to GitHub, it often fails for various reasons. My plan is to first use this annotation to limit the scope and get Flink running smoothly. After that, I will remove the annotation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1172109248


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf:
##########
@@ -19,55 +19,81 @@
 ######
 
 env {
-  # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
-  MongoDB {
-    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+  Mongodb {
+    connection = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
     database = "test_db"
-    collection = "source_table"
+    collection = "sink_table"
+    no-timeout = true
+    fetch.size = 1000
+    max.time.min = 100
+    result_table_name = "mongodb_table"
     schema = {
       fields {
-        id = bigint
-        c_map = "map<string, smallint>"
-        c_array = "array<tinyint>"
+        c_map = "map<string, string>"
+        c_array = "array<int>"
         c_string = string
         c_boolean = boolean
-        c_tinyint = tinyint
-        c_smallint = smallint
+        c_tinyint = int

Review Comment:
   <img width="414" alt="图片" src="https://user-images.githubusercontent.com/60029759/233270135-e14338d2-43a5-436b-aa12-19b34f4837b7.png">
   For example, source, write mongdb through the row of st, and the BSON of mongodb is not distinguished by TINYINT and SMALLINT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1172107951


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf:
##########
@@ -19,55 +19,81 @@
 ######
 
 env {
-  # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
-  MongoDB {
-    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+  Mongodb {
+    connection = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
     database = "test_db"
-    collection = "source_table"
+    collection = "sink_table"
+    no-timeout = true
+    fetch.size = 1000
+    max.time.min = 100
+    result_table_name = "mongodb_table"
     schema = {
       fields {
-        id = bigint
-        c_map = "map<string, smallint>"
-        c_array = "array<tinyint>"
+        c_map = "map<string, string>"
+        c_array = "array<int>"
         c_string = string
         c_boolean = boolean
-        c_tinyint = tinyint
-        c_smallint = smallint
+        c_tinyint = int

Review Comment:
   @TyrantLucifer You can take a look at the documentation of mongodb connector. The data type of mongodb and the data type of st are not fully mapped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1189262794


##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.WriteModel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED;
+
+@Slf4j
+public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private MongodbClientProvider collectionProvider;
+
+    private final DocumentSerializer<SeaTunnelRow> serializer;
+
+    private long bulkActions;
+
+    private final List<WriteModel<BsonDocument>> bulkRequests;
+
+    private int maxRetries;
+
+    private long retryIntervalMs;
+
+    private long batchIntervalMs;
+
+    private volatile long lastSendTime = 0L;
+
+    public MongodbWriter(
+            DocumentSerializer<SeaTunnelRow> serializer, MongodbWriterOptions options) {
+        initOptions(options);
+        this.serializer = serializer;
+        this.bulkRequests = new ArrayList<>();
+    }
+
+    private void initOptions(MongodbWriterOptions options) {
+        this.maxRetries = options.getRetryMax();
+        this.retryIntervalMs = options.getRetryInterval();
+        this.collectionProvider =
+                MongodbCollectionProvider.getBuilder()
+                        .connectionString(options.getConnectString())
+                        .database(options.getDatabase())
+                        .collection(options.getCollection())
+                        .build();
+        this.bulkActions = options.getFlushSize();
+        this.batchIntervalMs = options.getBatchIntervalMs();
+    }
+
+    @Override
+    public void write(SeaTunnelRow o) throws IOException {
+        bulkRequests.add(serializer.serializeToWriteModel(o));
+        if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
+            doBulkWrite();
+        }
+    }
+

Review Comment:
   @hailin0 Does your `review` mean what is described in the picture above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1191843989


##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/internal/MongodbCollectionProvider.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.internal;
+
+import com.google.common.base.Preconditions;
+
+/** A builder class for creating {@link MongodbClientProvider}. */
+public class MongodbCollectionProvider {
+
+    public static Builder getBuilder() {

Review Comment:
   ```suggestion
       public static Builder builder() {
   ```



##########
seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/internal/MongodbCollectionProvider.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.internal;
+
+import com.google.common.base.Preconditions;
+
+/** A builder class for creating {@link MongodbClientProvider}. */
+public class MongodbCollectionProvider {
+
+    public static Builder getBuilder() {

Review Comment:
   ```suggestion
       public static Builder builder() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on code in PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#discussion_r1171097529


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java:
##########
@@ -106,75 +83,6 @@ public void initConnection() {
         client = MongoClients.create(url);
     }
 

Review Comment:
   Add more test case to verify refactor is make sense.



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java:
##########
@@ -48,54 +38,41 @@
 
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoCollection;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.time.Duration;
-import java.time.LocalDate;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static java.net.HttpURLConnection.HTTP_OK;
 import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
 
 @Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.FLINK, EngineType.SPARK})

Review Comment:
   Why disable spark and flink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] MonsterChenzhuo commented on pull request #4620: [Feature][Connector-v2][Mongodb]Refactor mongodb connector

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on PR #4620:
URL: https://github.com/apache/incubator-seatunnel/pull/4620#issuecomment-1536991055

   <img width="520" alt="图片" src="https://user-images.githubusercontent.com/60029759/236594092-203297d4-2230-4a1c-9b62-d03bc15c683c.png">
   100G 460w Line data.
   Mongodb record test EngineServer three nodes, 10 concurrent execution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org