You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/02 08:48:37 UTC

[incubator-inlong] branch master updated: [INLONG-4273][Manager] Support MongoDB source (#4294)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c4d2f528 [INLONG-4273][Manager] Support MongoDB source (#4294)
4c4d2f528 is described below

commit 4c4d2f528b64ddc2a95d041a26d8bd59662b6966
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Thu Jun 2 16:48:31 2022 +0800

    [INLONG-4273][Manager] Support MongoDB source (#4294)
    
    * [INLONG-4273][Manager] Support MongoDB source
    
    * [INLONG-4273][Manager] Refactor MongoDB source codes
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../apache/inlong/common/enums/TaskTypeEnum.java   |   4 +
 .../inlong/manager/common/enums/SourceType.java    |   6 +-
 .../common/pojo/source/mongodb/MongoDBSource.java  |  72 ++++++++++++
 .../pojo/source/mongodb/MongoDBSourceDTO.java      |  87 +++++++++++++++
 .../source/mongodb/MongoDBSourceListResponse.java  |  63 +++++++++++
 .../pojo/source/mongodb/MongoDBSourceRequest.java  |  61 +++++++++++
 .../service/sort/util/ExtractNodeUtils.java        | 122 ++++++++++++---------
 .../source/mongo/MongoDBSourceOperation.java       | 101 +++++++++++++++++
 ...viceTest.java => MongoDBSourceServiceTest.java} |  33 +++---
 .../core/source/OracleSourceServiceTest.java       |   9 +-
 .../core/source/SqlServerSourceServiceTest.java    |   8 +-
 11 files changed, 482 insertions(+), 84 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index 0cb210be9..e59f6805e 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -28,6 +28,8 @@ public enum TaskTypeEnum {
     POSTGRES(6),
     ORACLE(7),
     SQLSERVER(8),
+    MONGODB(9),
+
 
     ;
 
@@ -57,6 +59,8 @@ public enum TaskTypeEnum {
                 return ORACLE;
             case 8:
                 return SQLSERVER;
+            case 9:
+                return MONGODB;
             default:
                 throw new RuntimeException(String.format("Unsupported taskType=%s", taskType));
         }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index f5f244aa7..200cc6be6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -35,7 +35,10 @@ public enum SourceType {
     PULSAR("PULSAR", TaskTypeEnum.PULSAR),
     POSTGRES("POSTGRES", TaskTypeEnum.POSTGRES),
     ORACLE("ORACLE", TaskTypeEnum.ORACLE),
-    SQLSERVER("SQLSERVER", TaskTypeEnum.SQLSERVER);
+    SQLSERVER("SQLSERVER", TaskTypeEnum.SQLSERVER),
+    MONGODB("MONGO", TaskTypeEnum.MONGODB),
+
+    ;
 
     public static final String SOURCE_AUTO_PUSH = "AUTO_PUSH";
     public static final String SOURCE_FILE = "FILE";
@@ -46,6 +49,7 @@ public enum SourceType {
     public static final String SOURCE_POSTGRES = "POSTGRES";
     public static final String SOURCE_ORACLE = "ORACLE";
     public static final String SOURCE_SQLSERVER = "SQLSERVER";
+    public static final String SOURCE_MONGODB = "MONGODB";
 
     @Getter
     private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSource.java
new file mode 100644
index 000000000..45c4b790c
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.mongodb;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * MongoDB source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "MongoDB source info")
+@JsonTypeDefine(value = SourceType.SOURCE_MONGODB)
+public class MongoDBSource extends StreamSource {
+
+    @ApiModelProperty("MongoDB primaryKey")
+    private String primaryKey;
+
+    @ApiModelProperty("MongoDB hosts")
+    private String hosts;
+
+    @ApiModelProperty("MongoDB username")
+    private String username;
+
+    @ApiModelProperty("MongoDB password")
+    private String password;
+
+    @ApiModelProperty("MongoDB database")
+    private String database;
+
+    @ApiModelProperty("MongoDB collection")
+    private String collection;
+
+    public MongoDBSource() {
+        this.setSourceType(SourceType.MONGODB.name());
+    }
+
+    @Override
+    public SourceRequest genSourceRequest() {
+        return CommonBeanUtils.copyProperties(this, MongoDBSourceRequest::new);
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
new file mode 100644
index 000000000..20033831e
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.mongodb;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * MongoDB source info
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class MongoDBSourceDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("MongoDB primaryKey")
+    private String primaryKey;
+
+    @ApiModelProperty("MongoDB hosts")
+    private String hosts;
+
+    @ApiModelProperty("MongoDB username")
+    private String username;
+
+    @ApiModelProperty("MongoDB password")
+    private String password;
+
+    @ApiModelProperty("MongoDB database")
+    private String database;
+
+    @ApiModelProperty("MongoDB collection")
+    private String collection;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static MongoDBSourceDTO getFromRequest(MongoDBSourceRequest request) {
+        return MongoDBSourceDTO.builder()
+                .primaryKey(request.getPrimaryKey())
+                .hosts(request.getHosts())
+                .username(request.getUsername())
+                .password(request.getPassword())
+                .database(request.getDatabase())
+                .collection(request.getCollection())
+                .build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string
+     */
+    public static MongoDBSourceDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, MongoDBSourceDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceListResponse.java
new file mode 100644
index 000000000..c3451e761
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceListResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.mongodb;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Response of MongoDB paging list
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of MongoDB paging list")
+@JsonTypeDefine(value = SourceType.SOURCE_MONGODB)
+public class MongoDBSourceListResponse extends SourceListResponse {
+
+    @ApiModelProperty("MongoDB primaryKey")
+    private String primaryKey;
+
+    @ApiModelProperty("MongoDB hosts")
+    private String hosts;
+
+    @ApiModelProperty("MongoDB username")
+    private String username;
+
+    @ApiModelProperty("MongoDB password")
+    private String password;
+
+    @ApiModelProperty("MongoDB database")
+    private String database;
+
+    @ApiModelProperty("MongoDB collection")
+    private String collection;
+
+    public MongoDBSourceListResponse() {
+        this.setSourceType(SourceType.MONGODB.getType());
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceRequest.java
new file mode 100644
index 000000000..83cd0bf77
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.mongodb;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of the MongoDB source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of MongoDB source")
+@JsonTypeDefine(value = SourceType.SOURCE_MONGODB)
+public class MongoDBSourceRequest extends SourceRequest {
+
+    @ApiModelProperty("MongoDB primaryKey")
+    private String primaryKey;
+
+    @ApiModelProperty("MongoDB hosts")
+    private String hosts;
+
+    @ApiModelProperty("MongoDB username")
+    private String username;
+
+    @ApiModelProperty("MongoDB password")
+    private String password;
+
+    @ApiModelProperty("MongoDB database")
+    private String database;
+
+    @ApiModelProperty("MongoDB collection")
+    private String collection;
+
+    public MongoDBSourceRequest() {
+        this.setSourceType(SourceType.MONGODB.toString());
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index c31b45991..3f3c44011 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaOffset;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSource;
 import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
 import org.apache.inlong.manager.common.pojo.source.oracle.OracleSource;
 import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSource;
@@ -40,6 +41,7 @@ import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
@@ -89,6 +91,8 @@ public class ExtractNodeUtils {
                 return createExtractNode((OracleSource) sourceInfo);
             case SQLSERVER:
                 return createExtractNode((SqlServerSource) sourceInfo);
+            case MONGODB:
+                return createExtractNode((MongoDBSource) sourceInfo);
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported sourceType=%s to create extractNode", sourceType));
@@ -229,7 +233,7 @@ public class ExtractNodeUtils {
                 .collect(Collectors.toList());
         String topic = pulsarSource.getTopic();
 
-        Format format = null;
+        Format format;
         DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
         switch (dataType) {
             case CSV:
@@ -295,84 +299,98 @@ public class ExtractNodeUtils {
     }
 
     /**
-     * Create Oracle extract node 
+     * Create Oracle extract node
      *
-     * @param oracleSource Oracle source info
+     * @param source Oracle source info
      * @return oracle extract node info
      */
-    public static OracleExtractNode createExtractNode(OracleSource oracleSource) {
-        final String id = oracleSource.getSourceName();
-        final String name = oracleSource.getSourceName();
-        final String database = oracleSource.getDatabase();
-        final String schemaName = oracleSource.getSchemaName();
-        final String tableName = oracleSource.getTableName();
-        final String primaryKey = oracleSource.getPrimaryKey();
-        final String hostName = oracleSource.getHostname();
-        final String userName = oracleSource.getUsername();
-        final String password = oracleSource.getPassword();
-        final Integer port = oracleSource.getPort();
-        ScanStartUpMode scanStartupMode = StringUtils.isBlank(oracleSource.getScanStartupMode())
-                ? null : ScanStartUpMode.forName(oracleSource.getScanStartupMode());
-        List<StreamField> streamFieldInfos = oracleSource.getFieldList();
+    public static OracleExtractNode createExtractNode(OracleSource source) {
+        String name = source.getSourceName();
+        List<StreamField> streamFieldInfos = source.getFieldList();
         final List<FieldInfo> fieldInfos = streamFieldInfos.stream()
                 .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
                 .collect(Collectors.toList());
+
         Map<String, String> properties = Maps.newHashMap();
-        return new OracleExtractNode(id,
+        ScanStartUpMode scanStartupMode = StringUtils.isBlank(source.getScanStartupMode())
+                ? null : ScanStartUpMode.forName(source.getScanStartupMode());
+
+        return new OracleExtractNode(
+                name,
                 name,
                 fieldInfos,
                 null,
                 properties,
-                primaryKey,
-                hostName,
-                userName,
-                password,
-                database,
-                schemaName,
-                tableName,
-                port,
-                scanStartupMode);
+                source.getPrimaryKey(),
+                source.getHostname(),
+                source.getUsername(),
+                source.getPassword(),
+                source.getDatabase(),
+                source.getSchemaName(),
+                source.getTableName(),
+                source.getPort(),
+                scanStartupMode
+        );
     }
 
     /**
      * Create SqlServer extract node
      *
-     * @param sqlServerSource SqlServer source info
+     * @param source SqlServer source info
      * @return SqlServer extract node info
      */
-    public static SqlServerExtractNode createExtractNode(SqlServerSource sqlServerSource) {
-        final String id = sqlServerSource.getSourceName();
-        final String name = sqlServerSource.getSourceName();
-        final String database = sqlServerSource.getDatabase();
-        final String primaryKey = sqlServerSource.getPrimaryKey();
-        final String hostName = sqlServerSource.getHostname();
-        final String userName = sqlServerSource.getUsername();
-        final String password = sqlServerSource.getPassword();
-        final Integer port = sqlServerSource.getPort();
-        final String schemaName = sqlServerSource.getSchemaName();
-
-        String tablename = sqlServerSource.getTableName();
-        List<StreamField> streamFields = sqlServerSource.getFieldList();
+    public static SqlServerExtractNode createExtractNode(SqlServerSource source) {
+        String name = source.getSourceName();
+        List<StreamField> streamFields = source.getFieldList();
         List<FieldInfo> fieldInfos = streamFields.stream()
                 .map(fieldInfo -> FieldInfoUtils.parseStreamFieldInfo(fieldInfo, name))
                 .collect(Collectors.toList());
-        final String serverTimeZone = sqlServerSource.getServerTimezone();
 
         Map<String, String> properties = Maps.newHashMap();
-        return new SqlServerExtractNode(id,
+        return new SqlServerExtractNode(
+                name,
                 name,
                 fieldInfos,
                 null,
                 properties,
-                primaryKey,
-                hostName,
-                port,
-                userName,
-                password,
-                database,
-                schemaName,
-                tablename,
-                serverTimeZone);
+                source.getPrimaryKey(),
+                source.getHostname(),
+                source.getPort(),
+                source.getUsername(),
+                source.getPassword(),
+                source.getDatabase(),
+                source.getSchemaName(),
+                source.getTableName(),
+                source.getServerTimezone()
+        );
+    }
+
+    /**
+     * Create MongoDB extract node
+     *
+     * @param source MongoDB source info
+     * @return MongoDB extract node info
+     */
+    public static MongoExtractNode createExtractNode(MongoDBSource source) {
+        String name = source.getSourceName();
+        List<StreamField> streamFields = source.getFieldList();
+        List<FieldInfo> fieldInfos = streamFields.stream()
+                .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
+                .collect(Collectors.toList());
+        Map<String, String> properties = Maps.newHashMap();
+        return new MongoExtractNode(
+                name,
+                name,
+                fieldInfos,
+                null,
+                properties,
+                source.getPrimaryKey(),
+                source.getCollection(),
+                source.getHosts(),
+                source.getUsername(),
+                source.getPassword(),
+                source.getDatabase()
+        );
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongo/MongoDBSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongo/MongoDBSourceOperation.java
new file mode 100644
index 000000000..10f66be1c
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongo/MongoDBSourceOperation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.mongo;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSource;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSourceRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.function.Supplier;
+
+/**
+ * MongoDB source operation.
+ */
+@Service
+public class MongoDBSourceOperation extends AbstractSourceOperation {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(SourceType sourceType) {
+        return SourceType.MONGODB == sourceType;
+    }
+
+    @Override
+    protected String getSourceType() {
+        return SourceType.MONGODB.getType();
+    }
+
+    @Override
+    protected StreamSource getSource() {
+        return new MongoDBSource();
+    }
+
+    @Override
+    public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
+        if (CollectionUtils.isEmpty(entityPage)) {
+            return new PageInfo<>();
+        }
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, MongoDBSourceListResponse::new));
+    }
+
+    @Override
+    protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+        MongoDBSourceRequest sourceRequest = (MongoDBSourceRequest) request;
+        CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+        try {
+            MongoDBSourceDTO dto = MongoDBSourceDTO.getFromRequest(sourceRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+    @Override
+    public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
+        T result = target.get();
+        if (entity == null) {
+            return result;
+        }
+        String existType = entity.getSourceType();
+        Preconditions.checkTrue(getSourceType().equals(existType),
+                String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
+        MongoDBSourceDTO dto = MongoDBSourceDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, result, true);
+        CommonBeanUtils.copyProperties(dto, result, true);
+        return result;
+    }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/MongoDBSourceServiceTest.java
similarity index 75%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
copy to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/MongoDBSourceServiceTest.java
index c467001a8..30661f729 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/MongoDBSourceServiceTest.java
@@ -19,8 +19,8 @@ package org.apache.inlong.manager.service.core.source;
 
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import org.apache.inlong.manager.common.pojo.source.oracle.OracleSource;
-import org.apache.inlong.manager.common.pojo.source.oracle.OracleSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSource;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSourceRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
@@ -30,17 +30,14 @@ import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
 /**
- * Oracle source service test
+ * MongoDB source service test
  */
-public class OracleSourceServiceTest extends ServiceBaseTest {
+public class MongoDBSourceServiceTest extends ServiceBaseTest {
 
-    private final String sourceName = "stream_source_service_test";
     private static final String hostname = "127.0.0.1";
-    private static final Integer port = 1521;
-    private static final String database = "oracle_database";
-    private static final String schema = "oracle_schema";
-    private static final String tablename = "oracle_tablename";
-
+    private static final String database = "mongo_database";
+    private static final String collection = "mongo_collection";
+    private final String sourceName = "stream_source_service_test";
     @Autowired
     private StreamSourceService sourceService;
     @Autowired
@@ -52,16 +49,14 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
     public Integer saveSource() {
         streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, GLOBAL_OPERATOR);
 
-        OracleSourceRequest sourceInfo = new OracleSourceRequest();
+        MongoDBSourceRequest sourceInfo = new MongoDBSourceRequest();
         sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID);
         sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
         sourceInfo.setSourceName(sourceName);
-        sourceInfo.setSourceType(SourceType.ORACLE.getType());
-        sourceInfo.setHostname(hostname);
+        sourceInfo.setSourceType(SourceType.MONGODB.getType());
+        sourceInfo.setHosts(hostname);
         sourceInfo.setDatabase(database);
-        sourceInfo.setTableName(tablename);
-        sourceInfo.setPort(port);
-        sourceInfo.setSchemaName(schema);
+        sourceInfo.setCollection(collection);
         return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
     }
 
@@ -77,7 +72,6 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
     @Test
     public void testListByIdentifier() {
         Integer id = this.saveSource();
-
         StreamSource source = sourceService.get(id);
         Assert.assertEquals(GLOBAL_GROUP_ID, source.getInlongGroupId());
 
@@ -90,9 +84,8 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
         StreamSource response = sourceService.get(id);
         Assert.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
 
-        OracleSource oracleSource = (OracleSource) response;
-
-        OracleSourceRequest request = CommonBeanUtils.copyProperties(oracleSource, OracleSourceRequest::new);
+        MongoDBSource mongoDBSource = (MongoDBSource) response;
+        MongoDBSourceRequest request = CommonBeanUtils.copyProperties(mongoDBSource, MongoDBSourceRequest::new);
         boolean result = sourceService.update(request, GLOBAL_OPERATOR);
         Assert.assertTrue(result);
 
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
index c467001a8..44180badd 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
@@ -34,13 +34,12 @@ import org.springframework.beans.factory.annotation.Autowired;
  */
 public class OracleSourceServiceTest extends ServiceBaseTest {
 
-    private final String sourceName = "stream_source_service_test";
     private static final String hostname = "127.0.0.1";
     private static final Integer port = 1521;
     private static final String database = "oracle_database";
     private static final String schema = "oracle_schema";
-    private static final String tablename = "oracle_tablename";
-
+    private static final String tableName = "oracle_table";
+    private final String sourceName = "stream_source_service_test";
     @Autowired
     private StreamSourceService sourceService;
     @Autowired
@@ -59,7 +58,7 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
         sourceInfo.setSourceType(SourceType.ORACLE.getType());
         sourceInfo.setHostname(hostname);
         sourceInfo.setDatabase(database);
-        sourceInfo.setTableName(tablename);
+        sourceInfo.setTableName(tableName);
         sourceInfo.setPort(port);
         sourceInfo.setSchemaName(schema);
         return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
@@ -77,7 +76,6 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
     @Test
     public void testListByIdentifier() {
         Integer id = this.saveSource();
-
         StreamSource source = sourceService.get(id);
         Assert.assertEquals(GLOBAL_GROUP_ID, source.getInlongGroupId());
 
@@ -91,7 +89,6 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
         Assert.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
 
         OracleSource oracleSource = (OracleSource) response;
-
         OracleSourceRequest request = CommonBeanUtils.copyProperties(oracleSource, OracleSourceRequest::new);
         boolean result = sourceService.update(request, GLOBAL_OPERATOR);
         Assert.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java
index 898c97361..91d8b718b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java
@@ -34,13 +34,12 @@ import org.springframework.beans.factory.annotation.Autowired;
  */
 public class SqlServerSourceServiceTest extends ServiceBaseTest {
 
-    private final String sourceName = "stream_source_service_test";
     private static final String hostname = "127.0.0.1";
     private static final Integer port = 1521;
     private static final String database = "sqlserver_database";
     private static final String schema = "sqlserver_schema";
-    private static final String tablename = "sqlserver_tablename";
-
+    private static final String tableName = "sqlserver_table";
+    private final String sourceName = "stream_source_service_test";
     @Autowired
     private StreamSourceService sourceService;
     @Autowired
@@ -59,7 +58,7 @@ public class SqlServerSourceServiceTest extends ServiceBaseTest {
         sourceInfo.setSourceType(SourceType.SQLSERVER.getType());
         sourceInfo.setHostname(hostname);
         sourceInfo.setDatabase(database);
-        sourceInfo.setTableName(tablename);
+        sourceInfo.setTableName(tableName);
         sourceInfo.setPort(port);
         sourceInfo.setSchemaName(schema);
         return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
@@ -90,7 +89,6 @@ public class SqlServerSourceServiceTest extends ServiceBaseTest {
         Assert.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
 
         SqlServerSource sqlserverSource = (SqlServerSource) response;
-
         SqlServerSourceRequest request = CommonBeanUtils.copyProperties(sqlserverSource, SqlServerSourceRequest::new);
         boolean result = sourceService.update(request, GLOBAL_OPERATOR);
         Assert.assertTrue(result);