You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/14 02:11:51 UTC

[inlong] branch master updated: [INLONG-5045][Agent] Support collect data from MongoDB (#5674)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new aabb0aa99 [INLONG-5045][Agent] Support collect data from MongoDB (#5674)
aabb0aa99 is described below

commit aabb0aa99bebabcda9d07934d8440062e06894a8
Author: seedscoder <se...@gmail.com>
AuthorDate: Wed Sep 14 10:11:46 2022 +0800

    [INLONG-5045][Agent] Support collect data from MongoDB (#5674)
---
 .../apache/inlong/agent/constant/JobConstants.java |  30 ++
 .../inlong/agent/pojo/DebeziumSourceFormat.java    |   4 +
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  57 ++++
 .../org/apache/inlong/agent/pojo/MongoJob.java     | 115 +++++++
 .../org/apache/inlong/agent/utils/GsonUtil.java    |  57 ++++
 inlong-agent/agent-plugins/pom.xml                 |   6 +
 .../inlong/agent/plugin/sources/MongoDBSource.java |  43 +++
 .../agent/plugin/sources/reader/BinlogReader.java  |   2 +-
 .../agent/plugin/sources/reader/MongoDBReader.java | 377 +++++++++++++++++++++
 .../plugin/sources/reader/PostgreSQLReader.java    |   2 +-
 ...eSQLSnapshotBase.java => AbstractSnapshot.java} |  67 ++--
 .../sources/snapshot/BinlogSnapshotBase.java       |  75 +---
 .../sources/snapshot/MongoDBSnapshotBase.java      |  64 ++++
 .../sources/snapshot/PostgreSQLSnapshotBase.java   |  70 +---
 .../sources/PostgreSQLOffsetManagerTest.java       |   2 +-
 .../plugin/sources/TestBinlogOffsetManager.java    |   2 +-
 .../agent/plugin/sources/TestMongoDBReader.java    | 119 +++++++
 licenses/inlong-agent/LICENSE                      |   1 +
 pom.xml                                            |   7 +
 19 files changed, 930 insertions(+), 170 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 1894d1507..f6a5d989e 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -89,6 +89,36 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_KAFKA_READ_TIMEOUT = "job.kafkaJob.read.timeout";
     public static final String JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET = "job.kafkaJob.autoOffsetReset";
 
+
+    public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts";
+    public static final String JOB_MONGO_USER = "job.mongoJob.user";
+    public static final String JOB_MONGO_PASSWORD = "job.mongoJob.password";
+    public static final String JOB_MONGO_DATABASE_INCLUDE_LIST = "job.mongoJob.databaseIncludeList";
+    public static final String JOB_MONGO_DATABASE_EXCLUDE_LIST = "job.mongoJob.databaseExcludeList";
+    public static final String JOB_MONGO_COLLECTION_INCLUDE_LIST = "job.mongoJob.collectionIncludeList";
+    public static final String JOB_MONGO_COLLECTION_EXCLUDE_LIST = "job.mongoJob.collectionExcludeList";
+    public static final String JOB_MONGO_FIELD_EXCLUDE_LIST = "job.mongoJob.fieldExcludeList";
+    public static final String JOB_MONGO_SNAPSHOT_MODE = "job.mongoJob.snapshotMode";
+    public static final String JOB_MONGO_CAPTURE_MODE = "job.mongoJob.captureMode";
+    public static final String JOB_MONGO_QUEUE_SIZE = "job.mongoJob.queueSize";
+    public static final String JOB_MONGO_STORE_HISTORY_FILENAME = "job.mongoJob.history.filename";
+    public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE = "job.mongoJob.offset.specificOffsetFile";
+    public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS = "job.mongoJob.offset.specificOffsetPos";
+    public static final String JOB_MONGO_OFFSETS = "job.mongoJob.offsets";
+    public static final String JOB_MONGO_CONNECT_TIMEOUT_MS = "job.mongoJob.connectTimeoutInMs";
+    public static final String JOB_MONGO_CURSOR_MAX_AWAIT = "job.mongoJob.cursorMaxAwaitTimeInMs";
+    public static final String JOB_MONGO_SOCKET_TIMEOUT = "job.mongoJob.socketTimeoutInMs";
+    public static final String JOB_MONGO_SELECTION_TIMEOUT = "job.mongoJob.selectionTimeoutInMs";
+    public static final String JOB_MONGO_FIELD_RENAMES = "job.mongoJob.fieldRenames";
+    public static final String JOB_MONGO_MEMBERS_DISCOVER = "job.mongoJob.membersAutoDiscover";
+    public static final String JOB_MONGO_CONNECT_MAX_ATTEMPTS = "job.mongoJob.connectMaxAttempts";
+    public static final String JOB_MONGO_BACKOFF_MAX_DELAY = "job.mongoJob.connectBackoffMaxDelayInMs";
+    public static final String JOB_MONGO_BACKOFF_INITIAL_DELAY = "job.mongoJob.connectBackoffInitialDelayInMs";
+    public static final String JOB_MONGO_INITIAL_SYNC_MAX_THREADS = "job.mongoJob.initialSyncMaxThreads";
+    public static final String JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED = "job.mongoJob.sslInvalidHostnameAllowed";
+    public static final String JOB_MONGO_SSL_ENABLE = "job.mongoJob.sslEnabled";
+    public static final String JOB_MONGO_POLL_INTERVAL = "job.mongoJob.pollIntervalInMs";
+
     public static final Long JOB_KAFKA_DEFAULT_OFFSET = 0L;
 
     // job type, delete/add
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumSourceFormat.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumSourceFormat.java
index 0e0eb9489..97c7bab17 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumSourceFormat.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumSourceFormat.java
@@ -29,5 +29,9 @@ public class DebeziumSourceFormat {
     private String db;
 
     private String table;
+    /**
+     * mongo source metadata
+     */
+    private String collection;
 
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index e1a1482f0..80ed18d95 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -50,6 +50,10 @@ public class JobProfileDto {
      * kafka source
      */
     public static final String KAFKA_SOURCE = "org.apache.inlong.agent.plugin.sources.KafkaSource";
+    /**
+     * mongo source
+     */
+    public static final String MONGO_SOURCE = "org.apache.inlong.agent.plugin.sources.MongoDBSource";
 
     private static final Gson GSON = new Gson();
 
@@ -171,6 +175,52 @@ public class JobProfileDto {
         return kafkaJob;
     }
 
+    private static MongoJob getMongoJob(DataConfig dataConfigs) {
+
+        MongoJob.MongoJobTaskConfig config = GSON.fromJson(dataConfigs.getExtParams(),
+                MongoJob.MongoJobTaskConfig.class);
+        MongoJob mongoJob = new MongoJob();
+
+        mongoJob.setHosts(config.getHosts());
+        mongoJob.setUser(config.getUser());
+        mongoJob.setPassword(config.getPassword());
+        mongoJob.setDatabaseIncludeList(config.getDatabaseIncludeList());
+        mongoJob.setDatabaseExcludeList(config.getDatabaseExcludeList());
+        mongoJob.setCollectionIncludeList(config.getCollectionIncludeList());
+        mongoJob.setCollectionExcludeList(config.getCollectionExcludeList());
+        mongoJob.setFieldExcludeList(config.getFieldExcludeList());
+        mongoJob.setConnectTimeoutInMs(config.getConnectTimeoutInMs());
+        mongoJob.setQueueSize(config.getQueueSize());
+        mongoJob.setCursorMaxAwaitTimeInMs(config.getCursorMaxAwaitTimeInMs());
+        mongoJob.setSocketTimeoutInMs(config.getSocketTimeoutInMs());
+        mongoJob.setSelectionTimeoutInMs(config.getSelectionTimeoutInMs());
+        mongoJob.setFieldRenames(config.getFieldRenames());
+        mongoJob.setMembersAutoDiscover(config.getMembersAutoDiscover());
+        mongoJob.setConnectMaxAttempts(config.getConnectMaxAttempts());
+        mongoJob.setConnectBackoffMaxDelayInMs(config.getConnectBackoffMaxDelayInMs());
+        mongoJob.setConnectBackoffInitialDelayInMs(config.getConnectBackoffInitialDelayInMs());
+        mongoJob.setInitialSyncMaxThreads(config.getInitialSyncMaxThreads());
+        mongoJob.setSslInvalidHostnameAllowed(config.getSslInvalidHostnameAllowed());
+        mongoJob.setSslEnabled(config.getSslEnabled());
+        mongoJob.setPollIntervalInMs(config.getPollIntervalInMs());
+
+        MongoJob.Offset offset = new MongoJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        mongoJob.setOffset(offset);
+
+        MongoJob.Snapshot snapshot = new MongoJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        mongoJob.setSnapshot(snapshot);
+
+        MongoJob.History history = new MongoJob.History();
+        history.setFilename(config.getHistoryFilename());
+        mongoJob.setHistory(history);
+
+        return mongoJob;
+    }
+
     private static Proxy getProxy(DataConfig dataConfigs) {
         Proxy proxy = new Proxy();
         Manager manager = new Manager();
@@ -231,6 +281,12 @@ public class JobProfileDto {
                 job.setSource(KAFKA_SOURCE);
                 profileDto.setJob(job);
                 break;
+            case MONGODB:
+                MongoJob mongoJob = getMongoJob(dataConfig);
+                job.setMongoJob(mongoJob);
+                job.setSource(MONGO_SOURCE);
+                profileDto.setJob(job);
+                break;
             default:
         }
         return TriggerProfile.parseJsonStr(GSON.toJson(profileDto));
@@ -257,6 +313,7 @@ public class JobProfileDto {
         private FileJob fileJob;
         private BinlogJob binlogJob;
         private KafkaJob kafkaJob;
+        private MongoJob mongoJob;
     }
 
     @Data
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoJob.java
new file mode 100644
index 000000000..087162d05
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoJob.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+/**
+ * MongoJob : mongo job
+ */
+@Data
+public class MongoJob {
+
+    private String hosts;
+    private String user;
+    private String password;
+    private String databaseIncludeList;
+    private String databaseExcludeList;
+    private String collectionIncludeList;
+    private String collectionExcludeList;
+    private String fieldExcludeList;
+    private String connectTimeoutInMs;
+    private String queueSize;
+    private String cursorMaxAwaitTimeInMs;
+    private String socketTimeoutInMs;
+    private String selectionTimeoutInMs;
+    private String fieldRenames;
+    private String membersAutoDiscover;
+    private String connectMaxAttempts;
+    private String connectBackoffMaxDelayInMs;
+    private String connectBackoffInitialDelayInMs;
+    private String initialSyncMaxThreads;
+    private String sslInvalidHostnameAllowed;
+    private String sslEnabled;
+    private String pollIntervalInMs;
+    private Snapshot snapshot;
+    private Capture capture;
+    private Offset offset;
+    private History history;
+
+    @Data
+    public static class Offset {
+
+        private String intervalMs;
+        private String filename;
+        private String specificOffsetFile;
+        private String specificOffsetPos;
+    }
+
+    @Data
+    public static class Snapshot {
+        private String mode;
+    }
+
+    @Data
+    public static class Capture {
+        private String mode;
+    }
+
+    @Data
+    public static class History {
+        private String filename;
+
+    }
+
+    @Data
+    public static class MongoJobTaskConfig {
+
+        private String hosts;
+        private String user;
+        private String password;
+
+        private String databaseIncludeList;
+        private String databaseExcludeList;
+        private String collectionIncludeList;
+        private String collectionExcludeList;
+        private String fieldExcludeList;
+        private String connectTimeoutInMs;
+        private String queueSize;
+        private String cursorMaxAwaitTimeInMs;
+        private String socketTimeoutInMs;
+        private String selectionTimeoutInMs;
+        private String fieldRenames;
+        private String membersAutoDiscover;
+        private String connectMaxAttempts;
+        private String connectBackoffMaxDelayInMs;
+        private String connectBackoffInitialDelayInMs;
+        private String initialSyncMaxThreads;
+        private String sslInvalidHostnameAllowed;
+        private String sslEnabled;
+        private String pollIntervalInMs;
+
+        private String snapshotMode;
+        private String captureMode;
+
+        private String offsetFilename;
+        private String historyFilename;
+        private String specificOffsetFile;
+        private String specificOffsetPos;
+    }
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/GsonUtil.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/GsonUtil.java
new file mode 100644
index 000000000..5e810199f
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/GsonUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.utils;
+
+import com.google.gson.Gson;
+
+/**
+ * GsonUtil : Gson instances are Thread-safe, so you can reuse them freely across multiple threads.
+ */
+public class GsonUtil {
+
+    private static final Gson gson = new Gson();
+
+    /**
+     * instantiation is not allowed
+     */
+    private GsonUtil() {
+        throw new UnsupportedOperationException("This is a utility class, so instantiation is not allowed");
+    }
+
+    /**
+     * This method deserializes the specified Json into an object of the specified class.
+     *
+     * @param json     json
+     * @param classOfT class of T
+     * @param <T>      T
+     * @return T
+     */
+    public static <T> T fromJson(String json, Class<T> classOfT) {
+        return gson.fromJson(json, classOfT);
+    }
+
+    /**
+     * This method serializes the specified object into its equivalent Json representation.
+     *
+     * @param obj obj
+     * @return json content
+     */
+    public static String toJson(Object obj) {
+        return gson.toJson(obj);
+    }
+}
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index 0df97e832..c00e5351f 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -84,6 +84,12 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-mongodb</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
new file mode 100644
index 000000000..21dd01f84
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.plugin.sources.reader.MongoDBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * MongoDBSource : mongo source, split mongo source job into multi readers
+ */
+public class MongoDBSource extends AbstractSource {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBSource.class);
+
+    @Override
+    public List<Reader> split(JobProfile conf) {
+        super.init(conf);
+        List<Reader> readerList = Collections.singletonList(new MongoDBReader());
+        sourceMetric.sourceSuccessCount.incrementAndGet();
+        return readerList;
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 165e85626..21188de90 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -155,7 +155,7 @@ public class BinlogReader extends AbstractReader {
                 tryToInitAndGetHistoryPath()) + "/offset.dat" + jobConf.getInstanceId();
         binlogSnapshot = new BinlogSnapshotBase(offsetStoreFileName);
         String offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
-        binlogSnapshot.save(offset);
+        binlogSnapshot.save(offset, binlogSnapshot.getFile());
 
         Properties props = getEngineProps();
         DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(io.debezium.engine.format.Json.class)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
new file mode 100644
index 000000000..b085e0b0f
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader;
+
+import com.alibaba.fastjson.JSONPath;
+import com.google.common.base.Preconditions;
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.connector.mongodb.MongoDbConnector;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.format.Json;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.snapshot.MongoDBSnapshotBase;
+import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
+import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
+import org.apache.inlong.agent.utils.GsonUtil;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.HOSTS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.USER;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.PASSWORD;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_TIMEOUT_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SOCKET_TIMEOUT_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ENABLED;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_INCLUDE_LIST;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.FIELD_EXCLUDE_LIST;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.FIELD_RENAMES;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_COPY_THREADS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SNAPSHOT_MODE;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CAPTURE_MODE;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_HOSTS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_USER;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_PASSWORD;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_INCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_EXCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_INCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_EXCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_EXCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SNAPSHOT_MODE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CAPTURE_MODE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_QUEUE_SIZE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_STORE_HISTORY_FILENAME;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSETS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_TIMEOUT_MS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CURSOR_MAX_AWAIT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SOCKET_TIMEOUT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SELECTION_TIMEOUT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_RENAMES;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_MEMBERS_DISCOVER;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_MAX_ATTEMPTS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_MAX_DELAY;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_INITIAL_DELAY;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_INITIAL_SYNC_MAX_THREADS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_ENABLE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_POLL_INTERVAL;
+
+/**
+ * MongoDBReader : mongo source, split mongo source job into multi readers
+ */
+public class MongoDBReader extends AbstractReader {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBReader.class);
+
+    private String instanceId;
+    private String offsetStoreFileName;
+    private String specificOffsetFile;
+    private String specificOffsetPos;
+    private boolean finished = false;
+    private boolean destroyed = false;
+
+    private ExecutorService executor;
+    /**
+     * mongo snapshot info <br/>
+     * Currently, there is no usage scenario
+     */
+    private MongoDBSnapshotBase snapshot;
+    /**
+     * message buffer queue
+     */
+    private LinkedBlockingQueue<Pair<String, DebeziumFormat>> bufferPool;
+
+    @Override
+    public Message read() {
+        if (!bufferPool.isEmpty()) {
+            super.readerMetric.pluginReadCount.incrementAndGet();
+            return this.pollMessage();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public boolean isFinished() {
+        return finished;
+    }
+
+    @Override
+    public String getReadSource() {
+        return instanceId;
+    }
+
+    @Override
+    public void setReadTimeout(long mills) {
+    }
+
+    @Override
+    public void setWaitMillisecond(long millis) {
+    }
+
+    @Override
+    public String getSnapshot() {
+        if (snapshot != null) {
+            return snapshot.getSnapshot();
+        } else {
+            return "";
+        }
+    }
+
+    @Override
+    public void finishRead() {
+        this.finished = true;
+    }
+
+    @Override
+    public boolean isSourceExist() {
+        return true;
+    }
+
+    @Override
+    public void destroy() {
+        synchronized (this) {
+            if (!destroyed) {
+                this.executor.shutdownNow();
+                this.snapshot.close();
+                this.destroyed = true;
+            }
+        }
+    }
+
+    @Override
+    public void init(JobProfile jobConf) {
+        super.init(jobConf);
+        this.setGlobalParamsValue(jobConf);
+        this.startEmbeddedDebeziumEngine(jobConf);
+    }
+
+    /**
+     * poll message from buffer pool
+     *
+     * @return org.apache.inlong.agent.plugin.Message
+     */
+    private Message pollMessage() {
+        // Retrieves and removes the head of this queue,
+        // or returns null if this queue is empty.
+        Pair<String, DebeziumFormat> message = bufferPool.poll();
+        if (message == null) {
+            return null;
+        }
+        Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY);
+        header.put(PROXY_KEY_DATA, message.getKey());
+        return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header);
+    }
+
+    /**
+     * set global parameters value
+     *
+     * @param jobConf job conf
+     */
+    private void setGlobalParamsValue(JobProfile jobConf) {
+        bufferPool = new LinkedBlockingQueue<>(jobConf.getInt(JOB_MONGO_QUEUE_SIZE, 1000));
+        instanceId = jobConf.getInstanceId();
+        // offset file absolute path
+        offsetStoreFileName = jobConf.get(JOB_MONGO_STORE_HISTORY_FILENAME,
+                MongoDBSnapshotBase.getSnapshotFilePath()) + "/mongo-" + instanceId + "-offset.dat";
+        // snapshot info
+        snapshot = new MongoDBSnapshotBase(offsetStoreFileName);
+        String offset = jobConf.get(JOB_MONGO_OFFSETS, "");
+        snapshot.save(offset, new File(offsetStoreFileName));
+        // offset info
+        specificOffsetFile = jobConf.get(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE, "");
+        specificOffsetPos = jobConf.get(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS, "-1");
+    }
+
+    /**
+     * start the embedded debezium engine
+     *
+     * @param jobConf job conf
+     */
+    private void startEmbeddedDebeziumEngine(JobProfile jobConf) {
+        DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = DebeziumEngine.create(Json.class)
+                .using(this.buildMongoConnectorConfig(jobConf))
+                .notifying(this::handleChangeEvent)
+                .using(this::handle)
+                .build();
+        this.executor = Executors.newSingleThreadExecutor();
+        this.executor.execute(debeziumEngine);
+    }
+
+    /**
+     * Handle the completion of the embedded connector engine.
+     *
+     * @param success {@code true} if the connector completed normally,
+     *                or {@code false} if the connector produced an error
+     *                that prevented startup or premature termination.
+     * @param message the completion message; never null
+     * @param error   the error, or null if there was no exception
+     */
+    private void handle(boolean success, String message, Throwable error) {
+        //jobConf.getInstanceId()
+        if (!success) {
+            LOGGER.error("{}, {}", message, error);
+        }
+    }
+
+    /**
+     * A Configuration object is basically a decorator around a {@link Properties} object.
+     *
+     * @return Configuration
+     */
+    private Properties buildMongoConnectorConfig(JobProfile jobConf) {
+        Configuration.Builder builder = Configuration.create();
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_HOSTS, HOSTS);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_USER, USER);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_PASSWORD, PASSWORD);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_DATABASE_INCLUDE_LIST, DATABASE_INCLUDE_LIST);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_DATABASE_EXCLUDE_LIST, DATABASE_EXCLUDE_LIST);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_COLLECTION_INCLUDE_LIST, COLLECTION_INCLUDE_LIST);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_COLLECTION_EXCLUDE_LIST, COLLECTION_EXCLUDE_LIST);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_FIELD_EXCLUDE_LIST, FIELD_EXCLUDE_LIST);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_SNAPSHOT_MODE, SNAPSHOT_MODE);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_CAPTURE_MODE, CAPTURE_MODE);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_CONNECT_TIMEOUT_MS, CONNECT_TIMEOUT_MS);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_CURSOR_MAX_AWAIT, CURSOR_MAX_AWAIT_TIME_MS);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_SOCKET_TIMEOUT, SOCKET_TIMEOUT_MS);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_SELECTION_TIMEOUT, SERVER_SELECTION_TIMEOUT_MS);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_FIELD_RENAMES, FIELD_RENAMES);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_MEMBERS_DISCOVER, AUTO_DISCOVER_MEMBERS);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_CONNECT_MAX_ATTEMPTS, MAX_FAILED_CONNECTIONS);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_BACKOFF_MAX_DELAY, CONNECT_BACKOFF_MAX_DELAY_MS);
+        setEngineConfigIfNecessary(jobConf, builder,
+                JOB_MONGO_BACKOFF_INITIAL_DELAY, CONNECT_BACKOFF_INITIAL_DELAY_MS);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_INITIAL_SYNC_MAX_THREADS, MAX_COPY_THREADS);
+        setEngineConfigIfNecessary(jobConf, builder,
+                JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED, SSL_ALLOW_INVALID_HOSTNAMES);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_SSL_ENABLE, SSL_ENABLED);
+        setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_POLL_INTERVAL, MONGODB_POLL_INTERVAL_MS);
+
+        Properties props = builder.build().asProperties();
+        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+        props.setProperty("connector.class", MongoDbConnector.class.getCanonicalName());
+        props.setProperty("name", instanceId);
+
+        String snapshotMode = props.getOrDefault(JOB_MONGO_SNAPSHOT_MODE, "").toString();
+        if (Objects.equals(SnapshotModeConstants.INITIAL, snapshotMode)) {
+            props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset());
+        } else {
+            props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
+        }
+        LOGGER.info("mongo job {} start with props {}",
+                jobConf.getInstanceId(),
+                GsonUtil.toJson(props));
+        return props;
+    }
+
+    private void setEngineConfigIfNecessary(JobProfile jobConf,
+                                            Configuration.Builder builder, String key, Field field) {
+        String value = jobConf.get(key, field.defaultValueAsString());
+        if (StringUtils.isBlank(value)) {
+            return;
+        }
+        builder.with(field, value);
+    }
+
+    private String serializeOffset() {
+        Map<String, Object> sourceOffset = new HashMap<>();
+        Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE,
+                JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null");
+        sourceOffset.put("file", specificOffsetFile);
+        Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS,
+                JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null");
+        sourceOffset.put("pos", specificOffsetPos);
+        DebeziumOffset specificOffset = new DebeziumOffset();
+        specificOffset.setSourceOffset(sourceOffset);
+        Map<String, String> sourcePartition = new HashMap<>();
+        sourcePartition.put("server", instanceId);
+        specificOffset.setSourcePartition(sourcePartition);
+        byte[] serializedOffset = new byte[0];
+        try {
+            serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+        } catch (IOException e) {
+            LOGGER.error("serialize offset message error", e);
+        }
+        return new String(serializedOffset, StandardCharsets.UTF_8);
+    }
+
+    /**
+     * Handles a batch of records, calling the {@link DebeziumEngine.RecordCommitter#markProcessed(Object)}
+     * for each record and {@link DebeziumEngine.RecordCommitter#markBatchFinished()} when this batch is finished.
+     *
+     * @param records   the records to be processed
+     * @param committer the committer that indicates to the system that we are finished
+     */
+    private void handleChangeEvent(List<ChangeEvent<String, String>> records,
+                                   DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) {
+        try {
+            for (ChangeEvent<String, String> record : records) {
+                DebeziumFormat debeziumFormat = JSONPath.read(record.value(), "$.payload", DebeziumFormat.class);
+                bufferPool.put(Pair.of(debeziumFormat.getSource().getCollection(), debeziumFormat));
+                committer.markProcessed(record);
+            }
+            committer.markBatchFinished();
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, super.inlongGroupId, super.inlongStreamId,
+                    System.currentTimeMillis(), records.size());
+            readerMetric.pluginReadCount.addAndGet(records.size());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            LOGGER.error("parse mongo message error", e);
+
+            readerMetric.pluginReadFailCount.addAndGet(records.size());
+        }
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
index 5d3e9207c..5d78c2787 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
@@ -146,7 +146,7 @@ public class PostgreSQLReader extends AbstractReader {
         specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
         specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
         postgreSQLSnapshot = new PostgreSQLSnapshotBase(offsetStoreFileName);
-        postgreSQLSnapshot.save(offset);
+        postgreSQLSnapshot.save(offset, postgreSQLSnapshot.getFile());
 
         Properties props = getEngineProps();
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/AbstractSnapshot.java
similarity index 69%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/AbstractSnapshot.java
index d27213389..8b100149d 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/AbstractSnapshot.java
@@ -22,55 +22,49 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.OutputStream;
+import java.io.ByteArrayOutputStream;
 import java.nio.file.Files;
 import java.util.Base64;
 
 /**
- * PostgreSQL Snapshot
+ * AbstractSnapshot : AbstractSnapshot
  */
-public class PostgreSQLSnapshotBase implements SnapshotBase {
-
-    public static final int BUFFER_SIZE = 1024;
-    public static final int START_OFFSET = 0;
-    private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLSnapshotBase.class);
-    private final Base64.Decoder decoder = Base64.getDecoder();
-    private final Base64.Encoder encoder = Base64.getEncoder();
-    private File file;
-    private byte[] offset;
+public abstract class AbstractSnapshot implements SnapshotBase {
 
-    public PostgreSQLSnapshotBase(String filePath) {
-        file = new File(filePath);
-    }
-
-    @Override
-    public String getSnapshot() {
-        load();
-        return encoder.encodeToString(offset);
-    }
+    private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotBase.class);
 
-    @Override
-    public void close() {
+    protected static final Base64.Decoder DECODER = Base64.getDecoder();
+    protected static final Base64.Encoder ENCODER = Base64.getEncoder();
 
-    }
+    /**
+     * buffer size used for reading and writing
+     */
+    private static final int BUFFER_SIZE = 8192;
+    /**
+     * start offset
+     */
+    private static final int START_OFFSET = 0;
 
     /**
-     * Load postgres offset from local file
+     * Load the file contents from the specified path
+     *
+     * @param file file
+     * @return file content
      */
-    private void load() {
+    public byte[] load(File file) {
         try {
             if (!file.exists()) {
                 // if parentDir not exist, create first
                 File parentDir = file.getParentFile();
                 if (parentDir == null) {
                     LOGGER.info("no parent dir, file:{}", file.getAbsolutePath());
-                    return;
+                    return new byte[0];
                 }
                 if (!parentDir.exists()) {
-                    boolean success = parentDir.mkdir();
+                    boolean success = parentDir.mkdirs();
                     LOGGER.info("create dir {} result {}", parentDir, success);
                 }
                 file.createNewFile();
@@ -83,23 +77,27 @@ public class PostgreSQLSnapshotBase implements SnapshotBase {
             while ((len = inputStream.read(buf)) != -1) {
                 outputStream.write(buf, START_OFFSET, len);
             }
-            offset = outputStream.toByteArray();
             inputStream.close();
             outputStream.close();
+            return outputStream.toByteArray();
         } catch (Throwable ex) {
-            LOGGER.error("load PostgreSQL WAL log error", ex);
+            LOGGER.error("load binlog offset error", ex);
             ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
+            return null;
         }
     }
 
     /**
-     * Save PostgreSQL offset to local file
+     * offset persist
+     *
+     * @param snapshot Contents of the file to be written back
+     * @param destFile Target file
      */
-    public void save(String snapshot) {
-        byte[] bytes = decoder.decode(snapshot);
+    public void save(String snapshot, File destFile) {
+        byte[] bytes = DECODER.decode(snapshot);
         if (bytes.length != 0) {
-            offset = bytes;
-            try (OutputStream output = Files.newOutputStream(file.toPath())) {
+            //offset = bytes;
+            try (OutputStream output = Files.newOutputStream(destFile.toPath())) {
                 output.write(bytes);
             } catch (Throwable e) {
                 LOGGER.error("save offset to file error", e);
@@ -107,4 +105,5 @@ public class PostgreSQLSnapshotBase implements SnapshotBase {
             }
         }
     }
+
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java
index d0b9ca230..d19b4f61c 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java
@@ -17,32 +17,19 @@
 
 package org.apache.inlong.agent.plugin.sources.snapshot;
 
-import org.apache.inlong.agent.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.OutputStream;
-import java.util.Base64;
-import java.util.Base64.Decoder;
-import java.util.Base64.Encoder;
 
 /**
  * binlog snapshot
  */
-public class BinlogSnapshotBase implements SnapshotBase {
+public class BinlogSnapshotBase extends AbstractSnapshot {
 
-    public static final int BUFFER_SIZE = 1024;
-    public static final int START_OFFSET = 0;
     private static final Logger log = LoggerFactory.getLogger(BinlogSnapshotBase.class);
-    private final Decoder decoder = Base64.getDecoder();
-    private final Encoder encoder = Base64.getEncoder();
-    private File file;
-    private byte[] offset;
+
+    private final File file;
 
     public BinlogSnapshotBase(String filePath) {
         file = new File(filePath);
@@ -50,64 +37,16 @@ public class BinlogSnapshotBase implements SnapshotBase {
 
     @Override
     public String getSnapshot() {
-        load();
-        return encoder.encodeToString(offset);
+        byte[] offset = this.load(this.file);
+        return ENCODER.encodeToString(offset);
     }
 
     @Override
     public void close() {
     }
 
-    /**
-     * load binlog offset from local file
-     */
-    public void load() {
-        try {
-            if (!file.exists()) {
-                // if parentDir not exist, create first
-                File parentDir = file.getParentFile();
-                if (parentDir == null) {
-                    log.info("no parent dir, file:{}", file.getAbsolutePath());
-                    return;
-                }
-                if (!parentDir.exists()) {
-                    boolean success = parentDir.mkdirs();
-                    log.info("create dir {} result {}", parentDir, success);
-                }
-                file.createNewFile();
-            }
-            FileInputStream fis = new FileInputStream(file);
-            BufferedInputStream inputStream = new BufferedInputStream(fis);
-            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-            int len;
-            byte[] buf = new byte[BUFFER_SIZE];
-            while ((len = inputStream.read(buf)) != -1) {
-                outputStream.write(buf, START_OFFSET, len);
-            }
-            offset = outputStream.toByteArray();
-            inputStream.close();
-            outputStream.close();
-        } catch (Throwable ex) {
-            log.error("load binlog offset error", ex);
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
-        }
-    }
-
-    /**
-     * save binlog offset to local file
-     */
-    public void save(String snapshot) {
-        byte[] bytes = decoder.decode(snapshot);
-        if (bytes.length != 0) {
-            offset = bytes;
-            try (OutputStream output = new FileOutputStream(file)) {
-                output.write(bytes);
-            } catch (Throwable e) {
-                log.error("save offset to file error", e);
-                ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
-
-            }
-        }
+    public File getFile() {
+        return file;
     }
 
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/MongoDBSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/MongoDBSnapshotBase.java
new file mode 100644
index 000000000..82108b208
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/MongoDBSnapshotBase.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.snapshot;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ * MongoDBSnapshotBase : mongo snapshot
+ */
+public class MongoDBSnapshotBase extends AbstractSnapshot {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBSnapshotBase.class);
+    /**
+     * agent configuration
+     */
+    private static final AgentConfiguration AGENT_CONFIGURATION = AgentConfiguration.getAgentConf();
+    /**
+     * snapshot file
+     */
+    private final File file;
+
+    public MongoDBSnapshotBase(String filePath) {
+        file = new File(filePath);
+    }
+
+    @Override
+    public String getSnapshot() {
+        return ENCODER.encodeToString(this.load(file));
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    public static String getSnapshotFilePath() {
+        String historyPath = AGENT_CONFIGURATION.get(
+                AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH);
+        String parentPath = AGENT_CONFIGURATION.get(
+                AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
+        return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath();
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java
index d27213389..8aee4c16f 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java
@@ -17,30 +17,18 @@
 
 package org.apache.inlong.agent.plugin.sources.snapshot;
 
-import org.apache.inlong.agent.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.util.Base64;
 
 /**
  * PostgreSQL Snapshot
  */
-public class PostgreSQLSnapshotBase implements SnapshotBase {
+public class PostgreSQLSnapshotBase extends AbstractSnapshot {
 
-    public static final int BUFFER_SIZE = 1024;
-    public static final int START_OFFSET = 0;
     private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLSnapshotBase.class);
-    private final Base64.Decoder decoder = Base64.getDecoder();
-    private final Base64.Encoder encoder = Base64.getEncoder();
-    private File file;
-    private byte[] offset;
+    private final File file;
 
     public PostgreSQLSnapshotBase(String filePath) {
         file = new File(filePath);
@@ -48,8 +36,8 @@ public class PostgreSQLSnapshotBase implements SnapshotBase {
 
     @Override
     public String getSnapshot() {
-        load();
-        return encoder.encodeToString(offset);
+        byte[] offset = this.load(this.file);
+        return ENCODER.encodeToString(offset);
     }
 
     @Override
@@ -57,54 +45,8 @@ public class PostgreSQLSnapshotBase implements SnapshotBase {
 
     }
 
-    /**
-     * Load postgres offset from local file
-     */
-    private void load() {
-        try {
-            if (!file.exists()) {
-                // if parentDir not exist, create first
-                File parentDir = file.getParentFile();
-                if (parentDir == null) {
-                    LOGGER.info("no parent dir, file:{}", file.getAbsolutePath());
-                    return;
-                }
-                if (!parentDir.exists()) {
-                    boolean success = parentDir.mkdir();
-                    LOGGER.info("create dir {} result {}", parentDir, success);
-                }
-                file.createNewFile();
-            }
-            FileInputStream fis = new FileInputStream(file);
-            BufferedInputStream inputStream = new BufferedInputStream(fis);
-            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-            int len;
-            byte[] buf = new byte[BUFFER_SIZE];
-            while ((len = inputStream.read(buf)) != -1) {
-                outputStream.write(buf, START_OFFSET, len);
-            }
-            offset = outputStream.toByteArray();
-            inputStream.close();
-            outputStream.close();
-        } catch (Throwable ex) {
-            LOGGER.error("load PostgreSQL WAL log error", ex);
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
-        }
+    public File getFile() {
+        return file;
     }
 
-    /**
-     * Save PostgreSQL offset to local file
-     */
-    public void save(String snapshot) {
-        byte[] bytes = decoder.decode(snapshot);
-        if (bytes.length != 0) {
-            offset = bytes;
-            try (OutputStream output = Files.newOutputStream(file.toPath())) {
-                output.write(bytes);
-            } catch (Throwable e) {
-                LOGGER.error("save offset to file error", e);
-                ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
-            }
-        }
-    }
 }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java
index 63cb8ae93..658a27f0c 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java
@@ -58,7 +58,7 @@ public class PostgreSQLOffsetManagerTest {
         byte[] snapshotBytes = new byte[]{-65,-14,23};
         final Base64 base64 = new Base64();
         String encodeSnapshot = base64.encodeAsString(snapshotBytes);
-        snapshotManager.save(encodeSnapshot);
+        snapshotManager.save(encodeSnapshot, snapshotManager.getFile());
         Assert.assertEquals(snapshotManager.getSnapshot(),encodeSnapshot);
         File file = new File(filePath.toString());
         Assert.assertEquals(file.exists(),true);
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java
index 45d94632e..4d6f262d4 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java
@@ -52,7 +52,7 @@ public class TestBinlogOffsetManager {
         byte[] snapshotBytes = new byte[]{-65, -14, -23};
         final Base64 base64 = new Base64();
         String encodeSnapshot = base64.encodeAsString(snapshotBytes);
-        snapshotManager.save(encodeSnapshot);
+        snapshotManager.save(encodeSnapshot, snapshotManager.getFile());
         Assert.assertEquals(snapshotManager.getSnapshot(), encodeSnapshot);
     }
 
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMongoDBReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMongoDBReader.java
new file mode 100644
index 000000000..9f5a06f45
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMongoDBReader.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import com.alibaba.fastjson.JSONPath;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.JobConstants;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.reader.MongoDBReader;
+import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+
+/**
+ * TestMongoDBReader : TestMongoDBReader
+ */
+public class TestMongoDBReader {
+
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestMongoDBReader.class);
+
+    /**
+     * Change event format verification
+     */
+    @Test
+    public void testDebeziumFormat() {
+        String json = "{\n"
+                + "    \"payload\": {\n"
+                + "        \"after\": \"{\\\"_id\\\":{\\\"$oid\\\":\\\"6304d21d96befa25a3630c5e\\\"}"
+                + ",\\\"orderId\\\":\\\"o0011\\\",\\\"userId\\\":\\\"u0012\\\",\\\"items\\\":"
+                + "[{\\\"itemId2\\\":\\\"i001\\\",\\\"itemName\\\":\\\"yyy\\\"},"
+                + "{\\\"itemId\\\":\\\"i002\\\",\\\"itemName\\\":\\\"yyy2\\\"}]}\",\n"
+                + "        \"patch\": null,\n"
+                + "        \"filter\": null,\n"
+                + "        \"updateDescription\": {\n"
+                + "            \"removedFields\": null,\n"
+                + "            \"updatedFields\": \"{\\\"items\\\":[{\\\"itemId2\\\":\\\"i001\\\", "
+                + "\\\"itemName\\\":\\\"xxx\\\"}, {\\\"itemId\\\":\\\"i002\\\", \\\"itemName\\\":\\\"xxx2\\\"}], "
+                + "\\\"userId\\\":\\\"u0012\\\"}\",\n"
+                + "            \"truncatedArrays\": null\n"
+                + "        },\n"
+                + "        \"source\": {\n"
+                + "            \"version\": \"1.8.0.Final\",\n"
+                + "            \"connector\": \"mongodb\",\n"
+                + "            \"name\": \"myrs\",\n"
+                + "            \"ts_ms\": 1661332000000,\n"
+                + "            \"snapshot\": \"false\",\n"
+                + "            \"db\": \"mall\",\n"
+                + "            \"sequence\": null,\n"
+                + "            \"rs\": \"myrs\",\n"
+                + "            \"collection\": \"order\",\n"
+                + "            \"ord\": 1,\n"
+                + "            \"h\": null,\n"
+                + "            \"tord\": null,\n"
+                + "            \"stxnid\": null,\n"
+                + "            \"lsid\": null,\n"
+                + "            \"txnNumber\": null\n"
+                + "        },\n"
+                + "        \"op\": \"u\",\n"
+                + "        \"ts_ms\": 1661332000257,\n"
+                + "        \"transaction\": null\n"
+                + "    }\n"
+                + "}";
+        DebeziumFormat debeziumFormat = JSONPath.read(json, "$.payload", DebeziumFormat.class);
+        Assert.assertEquals("order", debeziumFormat.getSource().getCollection());
+        Assert.assertEquals("false", debeziumFormat.getSource().getSnapshot());
+    }
+
+    /**
+     * Use local Mongo shard cluster for temporary testing
+     */
+    @Ignore
+    public void readChangeEventFromMongo() {
+        JobProfile jobProfile = new JobProfile();
+        jobProfile.set("job.mongoJob.hosts", "localhost:37018");
+        jobProfile.set("job.mongoJob.user", "mongo");
+        jobProfile.set("job.mongoJob.password", "root");
+        jobProfile.set("job.mongoJob.name", "myrs");
+        jobProfile.set("job.mongoJob.connectMaxAttempts", "3");
+        jobProfile.set("job.mongoJob.databaseIncludeList", "mall");
+        jobProfile.set("job.mongoJob.collectionIncludeList", "order");
+        jobProfile.set("job.mongoJob.snapshotMode", "never");
+        jobProfile.set(JobConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString());
+
+        jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString());
+        jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString());
+        MongoDBReader mongoReader = new MongoDBReader();
+        mongoReader.init(jobProfile);
+        while (true) {
+            Message message = mongoReader.read();
+            if (message != null) {
+                LOGGER.info("event content: {}", message);
+            }
+        }
+    }
+}
diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE
index 9b18e8655..04b79a5bd 100644
--- a/licenses/inlong-agent/LICENSE
+++ b/licenses/inlong-agent/LICENSE
@@ -368,6 +368,7 @@ The text of each license is the standard Apache 2.0 license.
   io.debezium:debezium-api:1.8.0.Final - Debezium API (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
   io.debezium:debezium-connector-mysql:1.8.0.Final - Debezium Connector for MySQL (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
   io.debezium:debezium-connector-postgres:1.8.0.Final - Debezium Connector for PostgreSQL (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
+  io.debezium:debezium-connector-mongodb:1.8.0.Final - Debezium connector for MongoDB (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
   io.debezium:debezium-core:1.8.0.Final - Debezium Core (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
   io.debezium:debezium-ddl-parser:1.8.0.Final - Debezium ANTLR DDL parsers (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
   io.debezium:debezium-embedded:1.8.0.Final - Debezium Embedded (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0)
diff --git a/pom.xml b/pom.xml
index 99819cc8d..943a3c0ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -603,6 +603,13 @@
                 <artifactId>debezium-connector-mysql</artifactId>
                 <version>${debezium.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>io.debezium</groupId>
+                <artifactId>debezium-connector-mongodb</artifactId>
+                <version>${debezium.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>io.debezium</groupId>
                 <artifactId>debezium-connector-postgres</artifactId>