You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/20 11:00:03 UTC

[pulsar] branch master updated: [feature][pulsar-io-mongo] Add support for full message synchronization (#16003)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cda2ea7cb08 [feature][pulsar-io-mongo] Add support for full message synchronization (#16003)
cda2ea7cb08 is described below

commit cda2ea7cb0814f5bb6dc6dd829ea6719822d2555
Author: Yuanhao Ji <ji...@apache.org>
AuthorDate: Tue Sep 20 18:59:54 2022 +0800

    [feature][pulsar-io-mongo] Add support for full message synchronization (#16003)
    
    ### Motivation
    
    Now, the MongoDB source connector only supports the incremental message synchronization.
    This PR adds support for full message synchronization.
    
    Since MongDB 4.0, we can set the starting point for the change stream by the `startAtOperationTime` field.
    So, we can set it to `0` to make start point the earliest.
    See https://www.mongodb.com/docs/v4.2/reference/method/db.collection.watch/ for more information.
---
 .../pulsar/io/mongodb/MongoAbstractConfig.java     | 102 +++++++++++++
 .../org/apache/pulsar/io/mongodb/MongoConfig.java  | 105 --------------
 .../org/apache/pulsar/io/mongodb/MongoSink.java    |  18 +--
 .../apache/pulsar/io/mongodb/MongoSinkConfig.java  |  75 ++++++++++
 .../org/apache/pulsar/io/mongodb/MongoSource.java  |  49 ++++---
 .../pulsar/io/mongodb/MongoSourceConfig.java       | 108 ++++++++++++++
 .../org/apache/pulsar/io/mongodb/SyncType.java}    |  44 ++----
 .../resources/META-INF/services/pulsar-io.yaml     |   4 +-
 .../apache/pulsar/io/mongodb/MongoConfigTest.java  |  87 ------------
 .../pulsar/io/mongodb/MongoSinkConfigTest.java     | 113 +++++++++++++++
 .../apache/pulsar/io/mongodb/MongoSinkTest.java    |   2 +-
 .../pulsar/io/mongodb/MongoSourceConfigTest.java   | 104 ++++++++++++++
 .../apache/pulsar/io/mongodb/MongoSourceTest.java  |   2 +-
 .../org/apache/pulsar/io/mongodb/TestHelper.java   |  52 +++++--
 .../mongo/src/test/resources/mongoSinkConfig.yaml  |   2 +-
 ...mongoSinkConfig.yaml => mongoSourceConfig.yaml} |   3 +-
 site2/docs/io-connectors.md                        |   8 +-
 site2/docs/io-mongo-source.md                      |  55 ++++++++
 tests/integration/pom.xml                          |  14 ++
 .../integration/io/sources/MongoSourceTester.java  | 157 +++++++++++++++++++++
 20 files changed, 839 insertions(+), 265 deletions(-)

diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java
new file mode 100644
index 00000000000..1c9786ea3d7
--- /dev/null
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+/**
+ * Configuration object for all MongoDB components.
+ */
+@Data
+@Accessors(chain = true)
+public abstract class MongoAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = -3830568531897300005L;
+
+    public static final int DEFAULT_BATCH_SIZE = 100;
+
+    public static final long DEFAULT_BATCH_TIME_MS = 1000;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "The URI of MongoDB that the connector connects to "
+                    + "(see: https://docs.mongodb.com/manual/reference/connection-string/)"
+    )
+    private final String mongoUri;
+
+    @FieldDoc(
+            defaultValue = "",
+            help = "The database name to which the collection belongs "
+                    + "and which must be watched for the source connector "
+                    + "(required for the sink connector)"
+    )
+    private final String database;
+
+    @FieldDoc(
+            defaultValue = "",
+            help = "The collection name where the messages are written "
+                    + "or which is watched for the source connector "
+                    + "(required for the sink connector)"
+    )
+    private final String collection;
+
+    @FieldDoc(
+            defaultValue = "" + DEFAULT_BATCH_SIZE,
+            help = "The batch size of write to or read from the database"
+    )
+    private final int batchSize;
+
+    @FieldDoc(
+            defaultValue = "" + DEFAULT_BATCH_TIME_MS,
+            help = "The batch operation interval in milliseconds")
+    private final long batchTimeMs;
+
+    public MongoAbstractConfig() {
+        this(null, null, null, DEFAULT_BATCH_SIZE, DEFAULT_BATCH_TIME_MS);
+    }
+
+    @JsonCreator
+    public MongoAbstractConfig(
+            @JsonProperty("mongoUri") String mongoUri,
+            @JsonProperty("database") String database,
+            @JsonProperty("collection") String collection,
+            @JsonProperty("batchSize") int batchSize,
+            @JsonProperty("batchTimeMs") long batchTimeMs
+    ) {
+        this.mongoUri = mongoUri;
+        this.database = database;
+        this.collection = collection;
+        this.batchSize = batchSize;
+        this.batchTimeMs = batchTimeMs;
+    }
+
+    public void validate() {
+        checkArgument(!StringUtils.isEmpty(getMongoUri()), "Required MongoDB URI is not set.");
+        checkArgument(getBatchSize() > 0, "batchSize must be a positive integer.");
+        checkArgument(getBatchTimeMs() > 0, "batchTimeMs must be a positive long.");
+    }
+}
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
deleted file mode 100644
index 2d5c8c62f38..00000000000
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.mongodb;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-import lombok.Data;
-import lombok.experimental.Accessors;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.io.core.annotations.FieldDoc;
-
-/**
- * Configuration class for the MongoDB Connectors.
- */
-@Data
-@Accessors(chain = true)
-public class MongoConfig implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    public static final int DEFAULT_BATCH_SIZE = 100;
-
-    public static final long DEFAULT_BATCH_TIME_MS = 1000;
-
-    @FieldDoc(
-        required = true,
-        defaultValue = "",
-        help = "The uri of mongodb that the connector connects to"
-                + " (see: https://docs.mongodb.com/manual/reference/connection-string/)"
-    )
-    private String mongoUri;
-
-    @FieldDoc(
-        defaultValue = "",
-        help = "The database name to which the collection belongs and which must be watched for the source connector"
-                + " (required for the sink connector)"
-    )
-    private String database;
-
-    @FieldDoc(
-        defaultValue = "",
-        help = "The collection name where the messages are written or which is watched for the source connector"
-                + " (required for the sink connector)"
-    )
-    private String collection;
-
-    @FieldDoc(
-        defaultValue = "" + DEFAULT_BATCH_SIZE,
-        help = "The batch size of write to or read from the database"
-    )
-    private int batchSize = DEFAULT_BATCH_SIZE;
-
-    @FieldDoc(
-        defaultValue = "" + DEFAULT_BATCH_TIME_MS,
-        help = "The batch operation interval in milliseconds")
-    private long batchTimeMs = DEFAULT_BATCH_TIME_MS;
-
-
-    public static MongoConfig load(String yamlFile) throws IOException {
-        final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        final MongoConfig cfg = mapper.readValue(new File(yamlFile), MongoConfig.class);
-
-        return cfg;
-    }
-
-    public static MongoConfig load(Map<String, Object> map) throws IOException {
-        final ObjectMapper mapper = new ObjectMapper();
-        final MongoConfig cfg = mapper.readValue(new ObjectMapper().writeValueAsString(map), MongoConfig.class);
-
-        return cfg;
-    }
-
-    public void validate(boolean dbRequired, boolean collectionRequired) {
-        if (StringUtils.isEmpty(getMongoUri())
-                || (dbRequired && StringUtils.isEmpty(getDatabase()))
-                || (collectionRequired && StringUtils.isEmpty(getCollection()))) {
-
-            throw new IllegalArgumentException("Required property not set.");
-        }
-
-        Preconditions.checkArgument(getBatchSize() > 0, "batchSize must be a positive integer.");
-        Preconditions.checkArgument(getBatchTimeMs() > 0, "batchTimeMs must be a positive long.");
-    }
-}
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 18c630952d6..4077442e8af 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -57,12 +57,12 @@ import org.reactivestreams.Subscription;
     name = "mongo",
     type = IOType.SINK,
     help = "A sink connector that sends pulsar messages to mongodb",
-    configClass = MongoConfig.class
+    configClass = MongoSinkConfig.class
 )
 @Slf4j
 public class MongoSink implements Sink<byte[]> {
 
-    private MongoConfig mongoConfig;
+    private MongoSinkConfig mongoSinkConfig;
 
     private MongoClient mongoClient;
 
@@ -86,22 +86,22 @@ public class MongoSink implements Sink<byte[]> {
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
         log.info("Open MongoDB Sink");
 
-        mongoConfig = MongoConfig.load(config);
-        mongoConfig.validate(true, true);
+        mongoSinkConfig = MongoSinkConfig.load(config);
+        mongoSinkConfig.validate();
 
         if (clientProvider != null) {
             mongoClient = clientProvider.get();
         } else {
-            mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+            mongoClient = MongoClients.create(mongoSinkConfig.getMongoUri());
         }
 
-        final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
-        collection = db.getCollection(mongoConfig.getCollection());
+        final MongoDatabase db = mongoClient.getDatabase(mongoSinkConfig.getDatabase());
+        collection = db.getCollection(mongoSinkConfig.getCollection());
 
         incomingList = Lists.newArrayList();
         flushExecutor = Executors.newScheduledThreadPool(1);
         flushExecutor.scheduleAtFixedRate(() -> flush(),
-                mongoConfig.getBatchTimeMs(), mongoConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS);
+                mongoSinkConfig.getBatchTimeMs(), mongoSinkConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -119,7 +119,7 @@ public class MongoSink implements Sink<byte[]> {
             currentSize = incomingList.size();
         }
 
-        if (currentSize == mongoConfig.getBatchSize()) {
+        if (currentSize == mongoSinkConfig.getBatchSize()) {
             flushExecutor.execute(() -> flush());
         }
     }
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java
new file mode 100644
index 00000000000..500b0eceaed
--- /dev/null
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Configuration class for the MongoDB Sink Connectors.
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Accessors(chain = true)
+public class MongoSinkConfig extends MongoAbstractConfig {
+
+    private static final long serialVersionUID = 8805978990904614084L;
+
+    @JsonCreator
+    public MongoSinkConfig(
+            @JsonProperty("mongoUri") String mongoUri,
+            @JsonProperty("database") String database,
+            @JsonProperty("collection") String collection,
+            @JsonProperty("batchSize") int batchSize,
+            @JsonProperty("batchTimeMs") long batchTimeMs
+    ) {
+        super(mongoUri, database, collection, batchSize, batchTimeMs);
+    }
+
+    public static MongoSinkConfig load(String yamlFile) throws IOException {
+        final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        final MongoSinkConfig cfg = mapper.readValue(new File(yamlFile), MongoSinkConfig.class);
+
+        return cfg;
+    }
+
+    public static MongoSinkConfig load(Map<String, Object> map) throws IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final MongoSinkConfig cfg = mapper.readValue(new ObjectMapper().writeValueAsString(map), MongoSinkConfig.class);
+
+        return cfg;
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+        checkArgument(!StringUtils.isEmpty(getDatabase()), "Required MongoDB database name is not set.");
+        checkArgument(!StringUtils.isEmpty(getCollection()), "Required MongoDB collection name is not set.");
+    }
+}
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
index 968df6afbba..b71b5eefd3b 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
@@ -40,6 +40,8 @@ import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
 import org.bson.Document;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
@@ -51,19 +53,17 @@ import org.reactivestreams.Subscription;
         name = "mongo",
         type = IOType.SOURCE,
         help = "A source connector that sends mongodb documents to pulsar",
-        configClass = MongoConfig.class
+        configClass = MongoSourceConfig.class
 )
 @Slf4j
 public class MongoSource extends PushSource<byte[]> {
 
     private final Supplier<MongoClient> clientProvider;
 
-    private MongoConfig mongoConfig;
+    private MongoSourceConfig mongoSourceConfig;
 
     private MongoClient mongoClient;
 
-    private Thread streamThread;
-
     private ChangeStreamPublisher<Document> stream;
 
 
@@ -79,38 +79,47 @@ public class MongoSource extends PushSource<byte[]> {
     public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
         log.info("Open MongoDB Source");
 
-        mongoConfig = MongoConfig.load(config);
-        mongoConfig.validate(false, false);
+        mongoSourceConfig = MongoSourceConfig.load(config);
+        mongoSourceConfig.validate();
 
         if (clientProvider != null) {
             mongoClient = clientProvider.get();
         } else {
-            mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+            mongoClient = MongoClients.create(mongoSourceConfig.getMongoUri());
         }
 
-        if (StringUtils.isEmpty(mongoConfig.getDatabase())) {
+        String mongoDatabase = mongoSourceConfig.getDatabase();
+        if (StringUtils.isEmpty(mongoDatabase)) {
             // Watch all databases
-            log.info("Watch all");
+            log.info("Watch all databases");
             stream = mongoClient.watch();
 
         } else {
-            final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
-
-            if (StringUtils.isEmpty(mongoConfig.getCollection())) {
+            final MongoDatabase db = mongoClient.getDatabase(mongoDatabase);
+            String mongoCollection = mongoSourceConfig.getCollection();
+            if (StringUtils.isEmpty(mongoCollection)) {
                 // Watch all collections in a database
                 log.info("Watch db: {}", db.getName());
                 stream = db.watch();
 
             } else {
                 // Watch a collection
-
-                final MongoCollection<Document> collection = db.getCollection(mongoConfig.getCollection());
-                log.info("Watch collection: {} {}", db.getName(), mongoConfig.getCollection());
+                final MongoCollection<Document> collection = db.getCollection(mongoCollection);
+                log.info("Watch collection: {}.{}", db.getName(), mongoCollection);
                 stream = collection.watch();
             }
         }
 
-        stream.batchSize(mongoConfig.getBatchSize()).fullDocument(FullDocument.UPDATE_LOOKUP);
+        stream.batchSize(mongoSourceConfig.getBatchSize())
+                .fullDocument(FullDocument.UPDATE_LOOKUP);
+
+        if (mongoSourceConfig.getSyncType() == SyncType.FULL_SYNC) {
+            // sync currently existing messages
+            // startAtOperationTime is the starting point for the change stream
+            // setting startAtOperationTime to 0 means the start point is the earliest
+            // see https://www.mongodb.com/docs/v4.2/reference/method/db.collection.watch/ for more information
+            stream.startAtOperationTime(new BsonTimestamp(0L));
+        }
 
         stream.subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
             private ObjectMapper mapper = new ObjectMapper();
@@ -127,6 +136,12 @@ public class MongoSource extends PushSource<byte[]> {
                 try {
                     log.info("New change doc: {}", doc);
 
+                    BsonDocument documentKey = doc.getDocumentKey();
+                    if (documentKey == null) {
+                        log.warn("The document key is null");
+                        return;
+                    }
+
                     // Build a record with the essential information
                     final Map<String, Object> recordValue = new HashMap<>();
                     recordValue.put("fullDocument", doc.getFullDocument());
@@ -134,7 +149,7 @@ public class MongoSource extends PushSource<byte[]> {
                     recordValue.put("operation", doc.getOperationType());
 
                     consume(new DocRecord(
-                            Optional.of(doc.getDocumentKey().toJson()),
+                            Optional.of(documentKey.toJson()),
                             mapper.writeValueAsString(recordValue).getBytes(StandardCharsets.UTF_8)));
 
                 } catch (JsonProcessingException e) {
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
new file mode 100644
index 00000000000..027c7743187
--- /dev/null
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+/**
+ * Configuration class for the MongoDB Source Connectors.
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Accessors(chain = true)
+public class MongoSourceConfig extends MongoAbstractConfig {
+
+    private static final long serialVersionUID = 1152890092264945317L;
+
+    public static final SyncType DEFAULT_SYNC_TYPE = SyncType.INCR_SYNC;
+
+    public static final String DEFAULT_SYNC_TYPE_STR = "INCR_SYNC";
+
+    @FieldDoc(
+            defaultValue = DEFAULT_SYNC_TYPE_STR,
+            help = "The message synchronization type of the source connector. "
+                    + "The field values can be of two types: incr and full. "
+                    + "When it is set to incr, the source connector will only watch for changes made from now on. "
+                    + "When it is set to full, the source connector will synchronize currently existing messages "
+                    + "and watch for future changes."
+    )
+    private SyncType syncType = DEFAULT_SYNC_TYPE;
+
+    @JsonCreator
+    public MongoSourceConfig(
+            @JsonProperty("mongoUri") String mongoUri,
+            @JsonProperty("database") String database,
+            @JsonProperty("collection") String collection,
+            @JsonProperty("batchSize") int batchSize,
+            @JsonProperty("batchTimeMs") long batchTimeMs,
+            @JsonProperty("syncType") String syncType
+    ) {
+        super(mongoUri, database, collection, batchSize, batchTimeMs);
+        setSyncType(syncType);
+    }
+
+    public static MongoSourceConfig load(String yamlFile) throws IOException {
+        final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        final MongoSourceConfig cfg = mapper.readValue(new File(yamlFile), MongoSourceConfig.class);
+
+        return cfg;
+    }
+
+    public static MongoSourceConfig load(Map<String, Object> map) throws IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final MongoSourceConfig cfg =
+                mapper.readValue(new ObjectMapper().writeValueAsString(map), MongoSourceConfig.class);
+
+        return cfg;
+    }
+
+    /**
+     * @param syncTypeStr Sync type string.
+     */
+    private void setSyncType(String syncTypeStr) {
+        // if syncType is not set, the default sync type is used
+        if (StringUtils.isEmpty(syncTypeStr)) {
+            this.syncType = DEFAULT_SYNC_TYPE;
+            return;
+        }
+
+        // if syncType is set but not correct, an exception will be thrown
+        try {
+            this.syncType = SyncType.valueOf(syncTypeStr.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("The value of the syncType field is incorrect.");
+        }
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+    }
+}
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/SyncType.java
similarity index 53%
copy from pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java
copy to pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/SyncType.java
index 82a0744c175..37d8268087a 100644
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/SyncType.java
@@ -16,40 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pulsar.io.mongodb;
 
-import java.util.HashMap;
-import java.util.Map;
-
-public final class TestHelper {
-
-    public static final String URI = "mongodb://localhost";
-
-    public static final String DB = "pulsar";
-
-    public static final String COLL = "messages";
-
-    public static final int BATCH_SIZE = 2;
-
-    public static final int BATCH_TIME = 500;
-
-
-    public static Map<String, Object> createMap(boolean full) {
-        final Map<String, Object> map = new HashMap<>();
-        map.put("mongoUri", URI);
-        map.put("database", DB);
-
-        if (full) {
-            map.put("collection", COLL);
-            map.put("batchSize", BATCH_SIZE);
-            map.put("batchTimeMs", BATCH_TIME);
-        }
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
 
-        return map;
-    }
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public enum SyncType {
 
-    private TestHelper() {
+    /**
+     * Synchronize all data.
+     */
+    FULL_SYNC,
 
-    }
+    /**
+     * Synchronize incremental data.
+     */
+    INCR_SYNC
 }
diff --git a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
index 4fab476dc72..59be102d894 100644
--- a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,5 +20,5 @@ name: mongo
 description: MongoDB source and sink connector
 sinkClass: org.apache.pulsar.io.mongodb.MongoSink
 sourceClass: org.apache.pulsar.io.mongodb.MongoSource
-sourceConfigClass: org.apache.pulsar.io.mongodb.MongoConfig
-sinkConfigClass: org.apache.pulsar.io.mongodb.MongoConfig
+sourceConfigClass: org.apache.pulsar.io.mongodb.MongoSourceConfig
+sinkConfigClass: org.apache.pulsar.io.mongodb.MongoSinkConfig
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
deleted file mode 100644
index 8495d87edab..00000000000
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.io.mongodb;
-
-import org.testng.annotations.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-
-public class MongoConfigTest {
-
-    private static File getFile(String fileName) {
-        return new File(MongoConfigTest.class.getClassLoader().getResource(fileName).getFile());
-    }
-
-    @Test
-    public void testMap() throws IOException {
-        final Map<String, Object> map = TestHelper.createMap(true);
-        final MongoConfig cfg = MongoConfig.load(map);
-
-        assertEquals(cfg.getMongoUri(), TestHelper.URI);
-        assertEquals(cfg.getDatabase(), TestHelper.DB);
-        assertEquals(cfg.getCollection(), TestHelper.COLL);
-        assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class,
-            expectedExceptionsMessageRegExp = "Required property not set.")
-    public void testBadMap() throws IOException {
-        final Map<String, Object> map = TestHelper.createMap(false);
-        final MongoConfig cfg = MongoConfig.load(map);
-
-        cfg.validate(true, true);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class,
-            expectedExceptionsMessageRegExp = "batchSize must be a positive integer.")
-    public void testBadBatchSize() throws IOException {
-        final Map<String, Object> map = TestHelper.createMap(true);
-        map.put("batchSize", 0);
-        final MongoConfig cfg = MongoConfig.load(map);
-
-        cfg.validate(true, true);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class,
-            expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.")
-    public void testBadBatchTime() throws IOException {
-        final Map<String, Object> map = TestHelper.createMap(true);
-        map.put("batchTimeMs", 0);
-        final MongoConfig cfg = MongoConfig.load(map);
-
-        cfg.validate(true, true);
-    }
-
-    @Test
-    public void testYaml() throws IOException {
-        final File yaml = getFile("mongoSinkConfig.yaml");
-        final MongoConfig cfg = MongoConfig.load(yaml.getAbsolutePath());
-
-        assertEquals(cfg.getMongoUri(), TestHelper.URI);
-        assertEquals(cfg.getDatabase(), TestHelper.DB);
-        assertEquals(cfg.getCollection(), TestHelper.COLL);
-        assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
-        assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
-    }
-}
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java
new file mode 100644
index 00000000000..8cdcbe528ce
--- /dev/null
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.mongodb;
+
+import java.util.Map;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.testng.Assert.assertEquals;
+
+public class MongoSinkConfigTest {
+
+    @Test
+    public void testLoadMapConfig() throws IOException {
+        final Map<String, Object> commonConfigMap = TestHelper.createCommonConfigMap();
+        commonConfigMap.put("batchSize", TestHelper.BATCH_SIZE);
+        commonConfigMap.put("batchTimeMs", TestHelper.BATCH_TIME);
+
+        final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap);
+
+        assertEquals(cfg.getMongoUri(), TestHelper.URI);
+        assertEquals(cfg.getDatabase(), TestHelper.DB);
+        assertEquals(cfg.getCollection(), TestHelper.COLL);
+        assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+        assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "Required MongoDB URI is not set.")
+    public void testBadMongoUri() throws IOException {
+        final Map<String, Object> configMap = TestHelper.createCommonConfigMap();
+        TestHelper.removeMongoUri(configMap);
+
+        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+
+        cfg.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "Required MongoDB database name is not set.")
+    public void testBadDatabase() throws IOException {
+        final Map<String, Object> configMap = TestHelper.createCommonConfigMap();
+        TestHelper.removeDatabase(configMap);
+
+        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+
+        cfg.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "Required MongoDB collection name is not set.")
+    public void testBadCollection() throws IOException {
+        final Map<String, Object> configMap = TestHelper.createCommonConfigMap();
+        TestHelper.removeCollection(configMap);
+
+        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+
+        cfg.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "batchSize must be a positive integer.")
+    public void testBadBatchSize() throws IOException {
+        final Map<String, Object> configMap = TestHelper.createCommonConfigMap();
+        TestHelper.putBatchSize(configMap, 0);
+
+        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+
+        cfg.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.")
+    public void testBadBatchTime() throws IOException {
+        final Map<String, Object> configMap = TestHelper.createCommonConfigMap();
+        TestHelper.putBatchTime(configMap, 0L);
+
+        final MongoSinkConfig cfg = MongoSinkConfig.load(configMap);
+
+        cfg.validate();
+    }
+
+    @Test
+    public void testLoadYamlConfig() throws IOException {
+        final File yaml = TestHelper.getFile(MongoSinkConfigTest.class, "mongoSinkConfig.yaml");
+        final MongoSinkConfig cfg = MongoSinkConfig.load(yaml.getAbsolutePath());
+
+        assertEquals(cfg.getMongoUri(), TestHelper.URI);
+        assertEquals(cfg.getDatabase(), TestHelper.DB);
+        assertEquals(cfg.getCollection(), TestHelper.COLL);
+        assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+        assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
+    }
+}
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
index 5fe9675ae5c..f1e5ef1d95c 100644
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
@@ -76,7 +76,7 @@ public class MongoSinkTest {
     @BeforeMethod
     public void setUp() {
 
-        map = TestHelper.createMap(true);
+        map = TestHelper.createCommonConfigMap();
 
         mockRecord = mock(Record.class);
         mockSinkContext = mock(SinkContext.class);
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java
new file mode 100644
index 00000000000..63e01551ae2
--- /dev/null
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.mongodb;
+
+import static org.testng.Assert.assertEquals;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.testng.annotations.Test;
+
+public class MongoSourceConfigTest {
+
+    @Test
+    public void testLoadMapConfig() throws IOException {
+        final Map<String, Object> configMap = TestHelper.createCommonConfigMap();
+        TestHelper.putSyncType(configMap, TestHelper.SYNC_TYPE);
+
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+
+        assertEquals(cfg.getMongoUri(), TestHelper.URI);
+        assertEquals(cfg.getDatabase(), TestHelper.DB);
+        assertEquals(cfg.getCollection(), TestHelper.COLL);
+        assertEquals(cfg.getSyncType(), TestHelper.SYNC_TYPE);
+        assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+        assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "Required MongoDB URI is not set.")
+    public void testBadMongoUri() throws IOException {
+        final Map<String, Object> configMap = TestHelper.createCommonConfigMap();
+        TestHelper.removeMongoUri(configMap);
+
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+
+        cfg.validate();
+    }
+
+    /**
+     * Test whether an exception is thrown when the syncType field has an incorrect value.
+     */
+    @Test(expectedExceptions = {IllegalArgumentException.class, JsonMappingException.class})
+    public void testBadSyncType() throws IOException {
+        final Map<String, Object> configMap = TestHelper.createCommonConfigMap();
+        TestHelper.putSyncType(configMap, "wrong_sync_type_str");
+
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+
+        cfg.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "batchSize must be a positive integer.")
+    public void testBadBatchSize() throws IOException {
+        final Map<String, Object> configMap = TestHelper.createCommonConfigMap();
+        TestHelper.putBatchSize(configMap, 0);
+
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+
+        cfg.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.")
+    public void testBadBatchTime() throws IOException {
+        final Map<String, Object> configMap = TestHelper.createCommonConfigMap();
+        TestHelper.putBatchTime(configMap, 0L);
+
+        final MongoSourceConfig cfg = MongoSourceConfig.load(configMap);
+
+        cfg.validate();
+    }
+
+    @Test
+    public void testLoadYamlConfig() throws IOException {
+        final File yaml = TestHelper.getFile(MongoSourceConfigTest.class, "mongoSourceConfig.yaml");
+        final MongoSourceConfig cfg = MongoSourceConfig.load(yaml.getAbsolutePath());
+
+        assertEquals(cfg.getMongoUri(), TestHelper.URI);
+        assertEquals(cfg.getDatabase(), TestHelper.DB);
+        assertEquals(cfg.getCollection(), TestHelper.COLL);
+        assertEquals(cfg.getSyncType(), TestHelper.SYNC_TYPE);
+        assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+        assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
+    }
+}
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
index 06df54e1649..c9b9d02a20f 100644
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
@@ -77,7 +77,7 @@ public class MongoSourceTest {
     @BeforeMethod
     public void setUp() {
 
-        map = TestHelper.createMap(true);
+        map = TestHelper.createCommonConfigMap();
 
         mockSourceContext = mock(SourceContext.class);
         mockMongoClient = mock(MongoClient.class);
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java
index 82a0744c175..fcfc46ecfee 100644
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java
@@ -19,6 +19,9 @@
 
 package org.apache.pulsar.io.mongodb;
 
+import static org.testng.Assert.assertNotNull;
+import java.io.File;
+import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -30,26 +33,57 @@ public final class TestHelper {
 
     public static final String COLL = "messages";
 
+    public static final SyncType SYNC_TYPE = SyncType.FULL_SYNC;
+
     public static final int BATCH_SIZE = 2;
 
-    public static final int BATCH_TIME = 500;
+    public static final long BATCH_TIME = 500L;
+
+    private TestHelper() {
+    }
 
+    public static File getFile(Class<?> clazz, String fileName) {
+        ClassLoader classLoader = clazz.getClassLoader();
+        URL url = classLoader.getResource(fileName);
+        assertNotNull(url);
+        return new File(url.getFile());
+    }
 
-    public static Map<String, Object> createMap(boolean full) {
+    /**
+     * @return a map with all common fields
+     */
+    public static Map<String, Object> createCommonConfigMap() {
         final Map<String, Object> map = new HashMap<>();
         map.put("mongoUri", URI);
         map.put("database", DB);
+        map.put("collection", COLL);
+        map.put("batchSize", BATCH_SIZE);
+        map.put("batchTimeMs", BATCH_TIME);
+        return map;
+    }
+
+    public static void removeMongoUri(Map<String, Object> configMap) {
+        configMap.remove("mongoUri");
+    }
 
-        if (full) {
-            map.put("collection", COLL);
-            map.put("batchSize", BATCH_SIZE);
-            map.put("batchTimeMs", BATCH_TIME);
-        }
+    public static void removeDatabase(Map<String, Object> configMap) {
+        configMap.remove("database");
+    }
 
-        return map;
+    public static void removeCollection(Map<String, Object> configMap) {
+        configMap.remove("collection");
     }
 
-    private TestHelper() {
+    public static void putSyncType(Map<String, Object> configMap, Object syncType) {
+        configMap.put("syncType", syncType);
+    }
+
+    public static void putBatchSize(Map<String, Object> configMap, int batchSize) {
+        configMap.put("batchSize", batchSize);
+    }
 
+    public static void putBatchTime(Map<String, Object> configMap, long batchTime) {
+        configMap.put("batchTimeMs", batchTime);
     }
+
 }
diff --git a/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml b/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml
index f7a9ea28a76..9857beab556 100644
--- a/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml
+++ b/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml
@@ -23,4 +23,4 @@
    "collection": "messages",
    "batchSize": 2,
    "batchTimeMs": 500
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml b/pulsar-io/mongo/src/test/resources/mongoSourceConfig.yaml
similarity index 96%
copy from pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml
copy to pulsar-io/mongo/src/test/resources/mongoSourceConfig.yaml
index f7a9ea28a76..68c783ca1ca 100644
--- a/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml
+++ b/pulsar-io/mongo/src/test/resources/mongoSourceConfig.yaml
@@ -21,6 +21,7 @@
    "mongoUri": "mongodb://localhost",
    "database": "pulsar",
    "collection": "messages",
+   "syncType": "full_sync",
    "batchSize": 2,
    "batchTimeMs": 500
-}
\ No newline at end of file
+}
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 83986c8c827..1b6f9e85bc2 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -101,7 +101,13 @@ Pulsar has various source connectors, which are sorted alphabetically as below.
 * [Configuration](io-kinesis-source.md#configuration)
   
 * [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java)
-  
+
+### MongoDB
+
+* [Configuration](io-mongo-source.md#configuration)
+
+* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java)
+
 ### Netty
 
 * [Configuration](io-netty-source.md#configuration)
diff --git a/site2/docs/io-mongo-source.md b/site2/docs/io-mongo-source.md
new file mode 100644
index 00000000000..a525cb79b32
--- /dev/null
+++ b/site2/docs/io-mongo-source.md
@@ -0,0 +1,55 @@
+---
+id: io-mongo-source
+title: MongoDB source connector
+sidebar_label: "MongoDB source connector"
+---
+
+The MongoDB source connector pulls documents from MongoDB and persists the messages to Pulsar topics.
+
+This guide explains how to configure and use the MongoDB source connector.
+
+## Configuration
+
+The configuration of the MongoDB source connector has the following properties.
+
+### Property
+
+| Name          | Type   | Required | Default            | Description                                                                                                                                                                                    |
+|---------------|--------|----------|--------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `mongoUri`    | String | true     | " " (empty string) | The MongoDB URI to which the connector connects. <br /><br />For more information, see [connection string URI format](https://docs.mongodb.com/manual/reference/connection-string/).           |
+| `database`    | String | false    | " " (empty string) | The name of the watched database. <br /><br />If this field is not set, the source connector will watch the entire MongoDB for all changes.                                                    |
+| `collection`  | String | false    | " " (empty string) | The name of the watched collection. <br /><br />If this field is not set, the source connector will watch the database for all changes.                                                        |
+| `syncType`    | String | false    | "INCR_SYNC"        | The synchronization type between MongoDB and Pulsar: full synchronization or incremental synchronization. <br /><br /> Valid values are `full_sync`, `FULL_SYNC`, `incr_sync` and `INCR_SYNC`. |
+| `batchSize`   | int    | false    | 100                | The batch size of pulling documents from collections.                                                                                                                                          |
+| `batchTimeMs` | long   | false    | 1000               | The batch operation interval in milliseconds.                                                                                                                                                  |
+
+### Example
+
+Before using the Mongo source connector, you need to create a configuration file through one of the following methods.
+
+* JSON
+
+  ```json
+  {
+     "configs": {
+        "mongoUri": "mongodb://localhost:27017",
+        "database": "pulsar",
+        "collection": "messages",
+        "syncType": "full_sync",
+        "batchSize": "2",
+        "batchTimeMs": "500"
+     }
+  }
+  ```
+
+* YAML
+
+  ```yaml
+  configs:
+      mongoUri: "mongodb://localhost:27017"
+      database: "pulsar"
+      collection: "messages"
+      syncType: "full_sync",
+      batchSize: 2
+      batchTimeMs: 500
+  ```
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index c491d005fda..7b842bc58b0 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -34,6 +34,7 @@
 
   <properties>
     <integrationTestSuiteFile>pulsar.xml</integrationTestSuiteFile>
+    <mongo-reactivestreams.version>4.1.2</mongo-reactivestreams.version>
   </properties>
 
   <dependencies>
@@ -208,6 +209,19 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- mongodb -->
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>mongodb</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>mongodb-driver-reactivestreams</artifactId>
+      <version>${mongo-reactivestreams.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/MongoSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/MongoSourceTester.java
new file mode 100644
index 00000000000..273a1ecdd48
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/MongoSourceTester.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.tests.integration.io.sources;
+
+import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import com.mongodb.client.model.changestream.FullDocument;
+import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
+import com.mongodb.reactivestreams.client.MongoClient;
+import com.mongodb.reactivestreams.client.MongoClients;
+import com.mongodb.reactivestreams.client.MongoDatabase;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.bson.Document;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MongoDBContainer;
+
+@Slf4j
+public class MongoSourceTester extends SourceTester<MongoDBContainer> {
+
+    private static final String SOURCE_TYPE = "mongo";
+
+    private static final String DEFAULT_DATABASE = "test";
+
+    private static final int DEFAULT_BATCH_SIZE = 2;
+
+    private final MongoDBContainer mongoContainer;
+
+    private final PulsarCluster pulsarCluster;
+
+    protected MongoSourceTester(MongoDBContainer mongoContainer, PulsarCluster pulsarCluster) {
+        super(SOURCE_TYPE);
+        this.mongoContainer = mongoContainer;
+        this.pulsarCluster = pulsarCluster;
+
+        sourceConfig.put("mongoUri", mongoContainer.getConnectionString());
+        sourceConfig.put("database", DEFAULT_DATABASE);
+        sourceConfig.put("syncType", "full_sync");
+        sourceConfig.put("batchSize", DEFAULT_BATCH_SIZE);
+    }
+
+    @Override
+    public void setServiceContainer(MongoDBContainer serviceContainer) {
+        log.info("start mongodb server container.");
+        pulsarCluster.startService(DebeziumMongoDbContainer.NAME, mongoContainer);
+    }
+
+    @Override
+    public void prepareSource() throws Exception {
+        MongoClient mongoClient = MongoClients.create(mongoContainer.getConnectionString());
+        MongoDatabase db = mongoClient.getDatabase(DEFAULT_DATABASE);
+        log.info("Subscribing mongodb change streams on: {}", mongoContainer.getReplicaSetUrl(DEFAULT_DATABASE));
+
+        ChangeStreamPublisher<Document> stream = db.watch();
+        stream.batchSize(DEFAULT_BATCH_SIZE)
+                .fullDocument(FullDocument.UPDATE_LOOKUP);
+
+        stream.subscribe(new Subscriber<>() {
+            @Override
+            public void onSubscribe(Subscription subscription) {
+                subscription.request(Integer.MAX_VALUE);
+            }
+
+            @Override
+            public void onNext(ChangeStreamDocument<Document> doc) {
+                log.info("New change doc: {}", doc);
+            }
+
+            @Override
+            public void onError(Throwable error) {
+                log.error("Subscriber error", error);
+            }
+
+            @Override
+            public void onComplete() {
+                log.info("Subscriber complete");
+            }
+        });
+
+        log.info("Successfully subscribe to mongodb change streams");
+    }
+
+    @Override
+    public void prepareInsertEvent() throws Exception {
+        Container.ExecResult execResult = this.mongoContainer.execInContainer(
+                "/usr/bin/mongo",
+                "--eval",
+                "db.products.insert" +
+                        "({" +
+                        "name: \"test-mongo\"," +
+                        "description: \"test message\"" +
+                        "})"
+        );
+        log.info("Successfully insert a message: {}", execResult.getStdout());
+    }
+
+    @Override
+    public void prepareDeleteEvent() throws Exception {
+        Container.ExecResult execResult = mongoContainer.execInContainer(
+                "/usr/bin/mongo",
+                "--eval",
+                "db.products.deleteOne" +
+                        "({" +
+                        "name: \"test-mongo\"" +
+                        "})"
+        );
+        log.info("Successfully delete a message: {}", execResult.getStdout());
+    }
+
+    @Override
+    public void prepareUpdateEvent() throws Exception {
+        Container.ExecResult execResult = mongoContainer.execInContainer(
+                "/usr/bin/mongo",
+                "--eval",
+                "db.products.update" +
+                        "(" +
+                        "{name: \"test-mongo-source\"}" +
+                        "," +
+                        "{$set:{name:\"test-mongo-update\", description: \"updated message\"}}" +
+                        ")"
+        );
+        log.info("Successfully update a message: {}", execResult.getStdout());
+    }
+
+    @Override
+    public Map<String, String> produceSourceMessages(int numMessages) throws Exception {
+        log.info("mongodb server already contains preconfigured data.");
+        return null;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (mongoContainer != null) {
+            mongoContainer.close();
+        }
+    }
+}