You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2022/09/07 07:44:09 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add mongodb connecter source (#2596)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3ee8a8a61 [Feature][Connector-V2] Add mongodb connecter source (#2596)
3ee8a8a61 is described below

commit 3ee8a8a619e51a2b3772f8226dd77c8989e52a01
Author: ChunFu Wu <31...@qq.com>
AuthorDate: Wed Sep 7 15:44:02 2022 +0800

    [Feature][Connector-V2] Add mongodb connecter source (#2596)
    
    * Add mongodb connecter v2 source
    
    * Add docs
    
    * Add license header
    
    * Add mongodb e2e
    
    * Add license header
    
    * Add codestyle
    
    * change config file
    
    * Resolve conflicts
    
    * Add mongodb connecter v2 source
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * Fix codestyle
---
 docs/en/connector-v2/source/MongoDB.md             |  76 +++++++++++++
 plugin-mapping.properties                          |   2 +
 pom.xml                                            |   4 +-
 seatunnel-connectors-v2-dist/pom.xml               |   5 +
 .../connector-mongodb}/pom.xml                     |  52 ++++-----
 .../seatunnel/mongodb/config/MongodbConfig.java    |  39 +++++++
 .../mongodb/config/MongodbParameters.java          |  33 ++++++
 .../seatunnel/mongodb/source/MongodbSource.java    | 114 +++++++++++++++++++
 .../mongodb/source/MongodbSourceReader.java        | 103 +++++++++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/release-docs/LICENSE                |   3 +
 .../{ => connector-mongodb-flink-e2e}/pom.xml      |  44 ++------
 .../flink/v2/mongodb/MongodbSourceToConsoleIT.java | 125 +++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  22 ++++
 .../test/resources/mongodb/mongodb_to_console.conf |  60 ++++++++++
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   1 +
 tools/dependencies/known-dependencies.txt          |   3 +
 17 files changed, 625 insertions(+), 62 deletions(-)

diff --git a/docs/en/connector-v2/source/MongoDB.md b/docs/en/connector-v2/source/MongoDB.md
new file mode 100644
index 000000000..e587f919a
--- /dev/null
+++ b/docs/en/connector-v2/source/MongoDB.md
@@ -0,0 +1,76 @@
+# MongoDb
+
+> MongoDb source connector
+
+## Description
+
+Read data from MongoDB.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name           | type   | required | default value |
+|----------------|--------|----------|---------------|
+| uri            | string | yes      | -             |
+| database       | string | yes      | -             |
+| collection     | string | yes      | -             |
+| schema         | object | yes      | -             |
+| common-options | string | yes      | -             |
+
+### uri [string]
+
+MongoDB uri
+
+### database [string]
+
+MongoDB database
+
+### collection [string]
+
+MongoDB collection
+
+### schema [object]
+
+Because `MongoDB` does not have the concept of `schema`, when engine reads `MongoDB` , it will sample `MongoDB` data and infer the `schema` . In fact, this process will be slow and may be inaccurate. This parameter can be manually specified. Avoid these problems. 
+
+such as:
+
+```
+schema {
+  fields {
+    id = int
+    key_aa = string
+    key_bb = string
+  }
+}
+```
+
+### common options [string]
+
+Source Plugin common parameters, refer to [Source Plugin](common-options.md) for details
+
+## Example
+
+```bash
+mongodb {
+    uri = "mongodb://username:password@127.0.0.1:27017/mypost?retryWrites=true&writeConcern=majority"
+    database = "mydatabase"
+    collection = "mycollection"
+    schema {
+      fields {
+        id = int
+        key_aa = string
+        key_bb = string
+      }
+    }
+    result_table_name = "mongodb_result_table"
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 78922c4eb..e90f02534 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -126,3 +126,5 @@ seatunnel.source.Redis = connector-redis
 seatunnel.sink.Redis = connector-redis
 seatunnel.sink.DataHub = connector-datahub
 seatunnel.sink.Sentry = connector-sentry
+seatunnel.source.MongoDB = connector-mongodb
+
diff --git a/pom.xml b/pom.xml
index cae9457dd..4a13fe353 100644
--- a/pom.xml
+++ b/pom.xml
@@ -256,7 +256,7 @@
                 <artifactId>commons-collections4</artifactId>
                 <version>${commons-collections4.version}</version>
             </dependency>
-            
+
             <dependency>
                 <groupId>com.beust</groupId>
                 <artifactId>jcommander</artifactId>
@@ -344,7 +344,7 @@
                 <artifactId>slf4j-log4j12</artifactId>
                 <version>${slf4j.version}</version>
             </dependency>
-            
+
             <dependency>
                 <groupId>commons-logging</groupId>
                 <artifactId>commons-logging</artifactId>
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index baf154db1..25bec4a20 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -161,6 +161,11 @@
             <artifactId>connector-sentry</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-mongodb</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-connectors-v2/connector-mongodb/pom.xml
similarity index 59%
copy from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
copy to seatunnel-connectors-v2/connector-mongodb/pom.xml
index 737e6c323..c6a4287de 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-connectors-v2/connector-mongodb/pom.xml
@@ -1,65 +1,61 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
+
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
+
        http://www.apache.org/licenses/LICENSE-2.0
+
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
+
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
     <parent>
-        <artifactId>seatunnel-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
         <version>${revision}</version>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
 
-    <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
+    <artifactId>connector-mongodb</artifactId>
 
-    <modules>
-        <module>connector-flink-e2e-base</module>
-        <module>connector-redis-flink-e2e</module>
-        <module>connector-file-flink-e2e</module>
-        <module>connector-jdbc-flink-e2e</module>
-        <module>connector-iotdb-flink-e2e</module>
-        <module>connector-datahub-flink-e2e</module>
-        <module>connector-assert-flink-e2e</module>
-        <module>connector-fake-flink-e2e</module>
-    </modules>
+    <properties>
+        <mongodb.version>3.12.11</mongodb.version>
+    </properties>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-flink</artifactId>
+            <artifactId>connector-common</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-connectors-v2-dist</artifactId>
+            <artifactId>seatunnel-format-json</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>*</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
-            <groupId>org.awaitility</groupId>
-            <artifactId>awaitility</artifactId>
-            <scope>test</scope>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver</artifactId>
+            <version>${mongodb.version}</version>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
new file mode 100644
index 000000000..45857e85b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.config;
+
+import java.io.Serializable;
+
+/**
+ * The config of mongodb
+ */
+public class MongodbConfig implements Serializable {
+
+    public static final String URI = "uri";
+
+    public static final String DATABASE = "database";
+
+    public static final String COLLECTION = "collection";
+
+    public static final String SCHEMA = "schema";
+
+    public static final String FORMAT = "format";
+
+    public static final String DEFAULT_FORMAT = "json";
+
+}
diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java
new file mode 100644
index 000000000..713d5421e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class MongodbParameters implements Serializable {
+
+    private String uri;
+
+    private String database;
+
+    private String collection;
+
+}
diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
new file mode 100644
index 000000000..077097569
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DEFAULT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.SCHEMA;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class MongodbSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+
+    private SeaTunnelContext seaTunnelContext;
+
+    private SeaTunnelRowType rowType;
+
+    private MongodbParameters params;
+
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+    @Override
+    public String getPluginName() {
+        return "MongoDB";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, URI, DATABASE, COLLECTION);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+        }
+
+        this.params = ConfigBeanFactory.create(config, MongodbParameters.class);
+
+        if (config.hasPath(SCHEMA)) {
+            Config schema = config.getConfig(SCHEMA);
+            this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+        } else {
+            this.rowType = SeaTunnelSchema.buildSimpleTextSchema();
+        }
+
+        // TODO: use format SPI
+        // default use json format
+        String format;
+        if (config.hasPath(FORMAT)) {
+            format = config.getString(FORMAT);
+            this.deserializationSchema = null;
+        } else {
+            format = DEFAULT_FORMAT;
+            this.deserializationSchema = new JsonDeserializationSchema(false, false, rowType);
+        }
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.rowType;
+    }
+
+    @Override
+    public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext context) throws Exception {
+        return new MongodbSourceReader(context, this.params, this.deserializationSchema);
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
new file mode 100644
index 000000000..0a4bd95e2
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCursor;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MongodbSourceReader.class);
+
+    private final SingleSplitReaderContext context;
+
+    private MongoClient client;
+
+    private final MongodbParameters params;
+
+    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+    MongodbSourceReader(SingleSplitReaderContext context, MongodbParameters params, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
+        this.context = context;
+        this.params = params;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public void open() throws Exception {
+        client = MongoClients.create(params.getUri());
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        try (MongoCursor<Document> mongoCursor = client.getDatabase(params.getDatabase()).getCollection(params.getCollection()).find().iterator()) {
+
+            while (mongoCursor.hasNext()) {
+                Document doc = mongoCursor.next();
+                HashMap<String, Object> map = new HashMap<>(doc.size());
+                Set<Map.Entry<String, Object>> entries = doc.entrySet();
+                for (Map.Entry<String, Object> entry : entries) {
+                    if (!"_id".equalsIgnoreCase(entry.getKey())) {
+                        String key = entry.getKey();
+                        Object value = entry.getValue();
+                        map.put(key, value);
+                    }
+                }
+                String content = JsonUtils.toJsonString(map);
+                if (deserializationSchema != null) {
+                    deserializationSchema.deserialize(content.getBytes(), output);
+                } else {
+                    // TODO: use seatunnel-text-format
+                    output.collect(new SeaTunnelRow(new Object[]{content}));
+                }
+            }
+        } finally {
+            if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+                // signal to the source that we have reached the end of the data.
+                LOGGER.info("Closed the bounded mongodb source");
+                context.signalNoMoreElement();
+            }
+        }
+    }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index e1c5b683d..facea7d47 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -53,6 +53,7 @@
         <module>connector-redis</module>
         <module>connector-datahub</module>
         <module>connector-sentry</module>
+        <module>connector-mongodb</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 06d2b3285..ecb3c9ed0 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -763,6 +763,9 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) Maven Settings (org.apache.maven:maven-settings:3.1.1 - http://maven.apache.org/ref/3.1.1/maven-settings)
      (The Apache Software License, Version 2.0) Maven Settings Builder (org.apache.maven:maven-settings-builder:3.1.1 - http://maven.apache.org/ref/3.1.1/maven-settings-builder)
      (The Apache Software License, Version 2.0) MongoDB Java Driver (org.mongodb:mongo-java-driver:3.4.2 - http://www.mongodb.org)
+     (The Apache Software License, Version 2.0) MongoDB Java Driver (org.mongodb:mongodb-driver:3.12.11 - http://www.mongodb.org)
+     (The Apache Software License, Version 2.0) MongoDB Java Driver Core (org.mongodb:mongodb-driver-core:3.12.11 - http://www.mongodb.org)
+     (The Apache Software License, Version 2.0) BSON (org.mongodb:bson:3.12.11 - https://bsonspec.org)
      (The Apache Software License, Version 2.0) Nimbus JOSE+JWT (com.nimbusds:nimbus-jose-jwt:4.41.1 - https://bitbucket.org/connect2id/nimbus-jose-jwt)
      (The Apache Software License, Version 2.0) Okio (com.squareup.okio:okio:1.17.2 - https://github.com/square/okio/)
      (The Apache Software License, Version 2.0) Phoenix - Spark (org.apache.phoenix:phoenix-spark:5.0.0-HBase-2.0 - http://www.apache.org/phoenix/phoenix-spark/)
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml
similarity index 51%
copy from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml
index 737e6c323..a49496c68 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml
@@ -13,53 +13,33 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project
+        xmlns="http://maven.apache.org/POM/4.0.0"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
     <parent>
-        <artifactId>seatunnel-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
-
-    <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
 
-    <modules>
-        <module>connector-flink-e2e-base</module>
-        <module>connector-redis-flink-e2e</module>
-        <module>connector-file-flink-e2e</module>
-        <module>connector-jdbc-flink-e2e</module>
-        <module>connector-iotdb-flink-e2e</module>
-        <module>connector-datahub-flink-e2e</module>
-        <module>connector-assert-flink-e2e</module>
-        <module>connector-fake-flink-e2e</module>
-    </modules>
+    <artifactId>connector-mongodb-flink-e2e</artifactId>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-flink</artifactId>
+            <artifactId>connector-flink-e2e-base</artifactId>
             <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-connectors-v2-dist</artifactId>
+            <artifactId>connector-mongodb</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>*</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.awaitility</groupId>
-            <artifactId>awaitility</artifactId>
-            <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java
new file mode 100644
index 000000000..1c5dc90ce
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.mongodb;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class MongodbSourceToConsoleIT extends FlinkContainer {
+
+    private static final String MONGODB_IMAGE = "mongo:latest";
+
+    private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb_source";
+
+    private static final int MONGODB_PORT = 27017;
+
+    private static final String MONGODB_DATABASE = "test_db";
+
+    private static final String MONGODB_COLLECTION = "test_table";
+
+    private GenericContainer<?> mongodbContainer;
+
+    private MongoClient client;
+
+    @BeforeEach
+    public void startMongoContainer() {
+        DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
+        mongodbContainer = new GenericContainer<>(imageName)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MONGODB_CONTAINER_HOST)
+            .withExposedPorts(MONGODB_PORT)
+            .waitingFor(new HttpWaitStrategy()
+                .forPort(MONGODB_PORT)
+                .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
+                .withStartupTimeout(Duration.ofMinutes(2)))
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        Startables.deepStart(Stream.of(mongodbContainer)).join();
+        log.info("Mongodb container started");
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+        this.generateTestData();
+    }
+
+    public void initConnection() {
+        String host = mongodbContainer.getContainerIpAddress();
+        int port = mongodbContainer.getFirstMappedPort();
+        String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
+
+        client = MongoClients.create(url);
+    }
+
+    private void generateTestData() {
+        MongoCollection<Document> mongoCollection = client
+            .getDatabase(MONGODB_DATABASE)
+            .getCollection(MONGODB_COLLECTION);
+
+        mongoCollection.deleteMany(new Document());
+
+        HashMap<String, Object> map = new HashMap<>();
+        map.put("id", 1);
+        map.put("key_aa", "value_aa");
+        map.put("key_bb", "value_bb");
+        Document doc = new Document(map);
+        mongoCollection.insertOne(doc);
+    }
+
+    @Test
+    public void testMongodbSource() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/mongodb/mongodb_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @AfterEach
+    public void close() {
+        super.close();
+        if (client != null) {
+            client.close();
+        }
+        if (mongodbContainer != null) {
+            mongodbContainer.close();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_to_console.conf
new file mode 100644
index 000000000..3480d955c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_to_console.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    MongoDB {
+        uri = "mongodb://flink_e2e_mongodb_source:27017/test_db?retryWrites=true&writeConcern=majority"
+        database = "test_db"
+        collection = "test_table"
+        schema {
+          fields {
+            id = int
+            key_aa = string
+            key_bb = string
+          }
+        }
+        result_table_name = "test_table"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/MongoDB
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/transform/Sql
+}
+
+sink {
+  Console {}
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 737e6c323..4f5f037dd 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -35,6 +35,7 @@
         <module>connector-datahub-flink-e2e</module>
         <module>connector-assert-flink-e2e</module>
         <module>connector-fake-flink-e2e</module>
+        <module>connector-mongodb-flink-e2e</module>
     </modules>
 
     <dependencies>
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 8f4a7db1e..d80df589f 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -434,7 +434,10 @@ memory-0.9.0.jar
 metrics-core-4.2.9.jar
 minlog-1.3.0.jar
 mongo-java-driver-3.4.2.jar
+mongodb-driver-3.12.11.jar
+mongodb-driver-core-3.12.11.jar
 mongo-spark-connector_2.11-2.2.0.jar
+bson-3.12.11.jar
 moshi-1.8.0.jar
 msgpack-core-0.9.0.jar
 mybatis-3.5.9.jar