You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:32 UTC

[rocketmq-connect] 09/13: reformat code and add more test case

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit f193fcdc1c654837d9dfee60ea4b2e56afbd6d5d
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Thu Aug 15 15:22:58 2019 +0800

    reformat code and add more test case
---
 README.md                                          |  12 ++
 .../org/apache/connect/mongo/SourceTaskConfig.java | 116 +++++++++++++-
 .../mongo/connector/MongoSourceConnector.java      |   6 +-
 .../connect/mongo/connector/MongoSourceTask.java   |  26 ++--
 .../mongo/connector/builder/MongoDataEntry.java    |  39 ++---
 .../connect/mongo/initsync/CollectionMeta.java     |   6 +-
 .../apache/connect/mongo/initsync/InitSync.java    |  30 ++--
 .../apache/connect/mongo/replicator/Constants.java |   6 -
 .../apache/connect/mongo/replicator/Filter.java    |  15 +-
 .../mongo/replicator/MongoClientFactory.java       | 113 ++++++++++++++
 .../connect/mongo/replicator/ReplicaSet.java       |  11 +-
 .../connect/mongo/replicator/ReplicaSetConfig.java |  45 +++---
 .../connect/mongo/replicator/ReplicaSets.java      |   8 +-
 .../mongo/replicator/ReplicaSetsContext.java       |  43 +----
 .../connect/mongo/replicator/ReplicatorTask.java   |  16 +-
 .../mongo/replicator/event/EventConverter.java     |  13 +-
 .../mongo/replicator/event/ReplicationEvent.java   |  36 ++---
 .../java/org/apache/connect/mongo/FilterTest.java  |  15 +-
 .../org/apache/connect/mongo/MongoFactoryTest.java | 173 +++++++++++++++++++++
 .../connect/mongo/MongoSourceConnectorTest.java    |  19 +--
 .../apache/connect/mongo/MongoSourceTaskTest.java  | 143 +++++++++++++++++
 .../java/org/apache/connect/mongo/MongoTest.java   |  18 +--
 .../apache/connect/mongo/ReplicaContextTest.java   |   3 +-
 .../org/apache/connect/mongo/ReplicaSetTest.java   |  59 +++++++
 .../org/apache/connect/mongo/ReplicaSetsTest.java  |  19 +--
 25 files changed, 758 insertions(+), 232 deletions(-)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..fb2c6c6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,12 @@
+# RocketMQ-connect-mongo
+
+this is source connector moudle for mongo,you can run this by running rocketmq connecotr api,
+
+some junit rely on mongo database you can start with a docker container
+
+`docker run -p27027:27017 --name mongo-test -d  mongo:4.0.10 --replSet "repl1"`
+
+and then init a mongo replicaSet
+
+`docker exec -it mongo-test mongo ` and `rs.initiate()` and then you can run all junit test
+
diff --git a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
index b79b2e9..3df9dc4 100644
--- a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
+++ b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
@@ -1,12 +1,11 @@
 package org.apache.connect.mongo;
 
 import io.openmessaging.KeyValue;
-import org.bson.BsonTimestamp;
-
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import org.bson.BsonTimestamp;
 
 public class SourceTaskConfig {
 
@@ -18,15 +17,50 @@ public class SourceTaskConfig {
     private String positionTimeStamp;
     private String positionInc;
     private String dataSync;
+    private String serverSelectionTimeoutMS;
+    private String connectTimeoutMS;
+    private String socketTimeoutMS;
+    private String ssl;
+    private String tsl;
+    private String tlsInsecure;
+    private String sslInvalidHostNameAllowed;
+    private String tlsAllowInvalidHostnames;
+    private String compressors;
+    private String zlibCompressionLevel;
+    private String trustStore;
+    private String trustStorePassword;
     private int copyThread = Runtime.getRuntime().availableProcessors();
 
-
     public static final Set<String> REQUEST_CONFIG = Collections.unmodifiableSet(new HashSet<String>() {
         {
             add("mongoAddr");
         }
     });
 
+    public String getTrustStore() {
+        return trustStore;
+    }
+
+    public void setTrustStore(String trustStore) {
+        this.trustStore = trustStore;
+    }
+
+    public String getTrustStorePassword() {
+        return trustStorePassword;
+    }
+
+    public void setTrustStorePassword(String trustStorePassword) {
+        this.trustStorePassword = trustStorePassword;
+    }
+
+    public String getZlibCompressionLevel() {
+        return zlibCompressionLevel;
+    }
+
+    public void setZlibCompressionLevel(String zlibCompressionLevel) {
+        this.zlibCompressionLevel = zlibCompressionLevel;
+    }
+
     public String getPositionInc() {
         return positionInc;
     }
@@ -67,7 +101,6 @@ public class SourceTaskConfig {
         this.mongoAddr = mongoAddr;
     }
 
-
     public String getMongoUserName() {
         return mongoUserName;
     }
@@ -84,7 +117,6 @@ public class SourceTaskConfig {
         this.mongoPassWord = mongoPassWord;
     }
 
-
     public String getDataSync() {
         return dataSync;
     }
@@ -93,15 +125,86 @@ public class SourceTaskConfig {
         this.dataSync = dataSync;
     }
 
-
     public String getReplicaSet() {
         return replicaSet;
     }
 
+    public String getServerSelectionTimeoutMS() {
+        return serverSelectionTimeoutMS;
+    }
+
+    public void setServerSelectionTimeoutMS(String serverSelectionTimeoutMS) {
+        this.serverSelectionTimeoutMS = serverSelectionTimeoutMS;
+    }
+
     public void setReplicaSet(String replicaSet) {
         this.replicaSet = replicaSet;
     }
 
+    public String getConnectTimeoutMS() {
+        return connectTimeoutMS;
+    }
+
+    public void setConnectTimeoutMS(String connectTimeoutMS) {
+        this.connectTimeoutMS = connectTimeoutMS;
+    }
+
+    public String getSocketTimeoutMS() {
+        return socketTimeoutMS;
+    }
+
+    public void setSocketTimeoutMS(String socketTimeoutMS) {
+        this.socketTimeoutMS = socketTimeoutMS;
+    }
+
+    public String getSsl() {
+        return ssl;
+    }
+
+    public void setSsl(String ssl) {
+        this.ssl = ssl;
+    }
+
+    public String getTsl() {
+        return tsl;
+    }
+
+    public void setTsl(String tsl) {
+        this.tsl = tsl;
+    }
+
+    public String getTlsInsecure() {
+        return tlsInsecure;
+    }
+
+    public void setTlsInsecure(String tlsInsecure) {
+        this.tlsInsecure = tlsInsecure;
+    }
+
+    public String getSslInvalidHostNameAllowed() {
+        return sslInvalidHostNameAllowed;
+    }
+
+    public void setSslInvalidHostNameAllowed(String sslInvalidHostNameAllowed) {
+        this.sslInvalidHostNameAllowed = sslInvalidHostNameAllowed;
+    }
+
+    public String getTlsAllowInvalidHostnames() {
+        return tlsAllowInvalidHostnames;
+    }
+
+    public void setTlsAllowInvalidHostnames(String tlsAllowInvalidHostnames) {
+        this.tlsAllowInvalidHostnames = tlsAllowInvalidHostnames;
+    }
+
+    public String getCompressors() {
+        return compressors;
+    }
+
+    public void setCompressors(String compressors) {
+        this.compressors = compressors;
+    }
+
     public void load(KeyValue props) {
 
         properties2Object(props, this);
@@ -148,7 +251,6 @@ public class SourceTaskConfig {
         }
     }
 
-
     public BsonTimestamp getPosition() {
         return new BsonTimestamp(Integer.valueOf(positionTimeStamp), Integer.valueOf(positionInc));
     }
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
index 2b28ea2..e3dfb6f 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
@@ -3,19 +3,17 @@ package org.apache.connect.mongo.connector;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.connect.mongo.SourceTaskConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class MongoSourceConnector extends SourceConnector {
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());
     private KeyValue keyValueConfig;
 
-
     @Override
     public String verifyAndSetConfig(KeyValue config) {
         for (String requestKey : SourceTaskConfig.REQUEST_CONFIG) {
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index 407608a..da244cd 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -4,16 +4,19 @@ import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.connector.api.source.SourceTask;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.connect.mongo.SourceTaskConfig;
-import org.apache.connect.mongo.replicator.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.SourceTaskConfig;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.ReplicaSet;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSets;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MongoSourceTask extends SourceTask {
 
@@ -42,7 +45,7 @@ public class MongoSourceTask extends SourceTask {
             replicaSets = ReplicaSets.create(sourceTaskConfig.getMongoAddr());
             replicaSets.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> {
                 ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
-                        replicaSetName.getBytes()));
+                    replicaSetName.getBytes()));
                 if (byteBuffer != null && byteBuffer.array().length > 0) {
                     String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8);
                     ReplicaSetConfig.Position position = JSONObject.parseObject(positionJson, ReplicaSetConfig.Position.class);
@@ -50,11 +53,11 @@ public class MongoSourceTask extends SourceTask {
                 } else {
                     ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
                     position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp() != null
-                            && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches()
-                            ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0);
+                        && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches()
+                        ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0);
                     position.setInc(sourceTaskConfig.getPositionInc() != null
-                            && pattern.matcher(sourceTaskConfig.getPositionInc()).matches()
-                            ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0);
+                        && pattern.matcher(sourceTaskConfig.getPositionInc()).matches()
+                        ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0);
                     position.setInitSync(StringUtils.equals(sourceTaskConfig.getDataSync(), Constants.INITSYNC) ? true : false);
                     replicaSetConfig.setPosition(position);
                 }
@@ -64,7 +67,6 @@ public class MongoSourceTask extends SourceTask {
                 replicaSet.start();
             });
 
-
         } catch (Throwable throwable) {
             logger.error("task start error", throwable);
             stop();
diff --git a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
index 7c0db7d..87d92ea 100644
--- a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
+++ b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
@@ -1,19 +1,27 @@
 package org.apache.connect.mongo.connector.builder;
 
-
 import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import org.apache.connect.mongo.replicator.Constants;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 import org.bson.BsonTimestamp;
 
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-
-import static org.apache.connect.mongo.replicator.Constants.*;
+import static org.apache.connect.mongo.replicator.Constants.CREATED;
+import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
+import static org.apache.connect.mongo.replicator.Constants.OBJECTID;
+import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE;
+import static org.apache.connect.mongo.replicator.Constants.PATCH;
+import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
+import static org.apache.connect.mongo.replicator.Constants.VERSION;
 
 public class MongoDataEntry {
 
@@ -28,8 +36,8 @@ public class MongoDataEntry {
             Schema schema = createdSchema(replicaSetConfig.getReplicaSetName());
             dataEntryBuilder = new DataEntryBuilder(schema);
             dataEntryBuilder.timestamp(System.currentTimeMillis())
-                    .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
-                    .entryType(event.getEntryType());
+                .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
+                .entryType(event.getEntryType());
 
             dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson());
             dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
@@ -38,8 +46,8 @@ public class MongoDataEntry {
             Schema schema = oplogSchema(replicaSetConfig.getReplicaSetName());
             dataEntryBuilder = new DataEntryBuilder(schema);
             dataEntryBuilder.timestamp(System.currentTimeMillis())
-                    .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
-                    .entryType(event.getEntryType());
+                .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
+                .entryType(event.getEntryType());
             dataEntryBuilder.putFiled(OPERATIONTYPE, event.getOperationType().name());
             dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue());
             dataEntryBuilder.putFiled(VERSION, event.getV());
@@ -48,15 +56,13 @@ public class MongoDataEntry {
             dataEntryBuilder.putFiled(OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
         }
 
-
         String position = createPosition(event, replicaSetConfig);
         SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-                ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)),
-                ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8)));
+            ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)),
+            ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8)));
         return sourceDataEntry;
     }
 
-
     private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
         ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
         BsonTimestamp timestamp = event.getTimestamp();
@@ -76,7 +82,6 @@ public class MongoDataEntry {
         return schema;
     }
 
-
     private static Schema oplogSchema(String dataSourceName) {
         Schema schema = new Schema();
         schema.setDataSource(dataSourceName);
@@ -85,7 +90,6 @@ public class MongoDataEntry {
         return schema;
     }
 
-
     private static void createdField(Schema schema) {
         Field namespace = new Field(0, NAMESPACE, FieldType.STRING);
         schema.getFields().add(namespace);
@@ -109,5 +113,4 @@ public class MongoDataEntry {
         schema.getFields().add(objectId);
     }
 
-
 }
diff --git a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
index 018418c..9c73eac 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
@@ -34,8 +34,8 @@ public class CollectionMeta {
     @Override
     public String toString() {
         return "CollectionMeta{" +
-                "databaseName='" + databaseName + '\'' +
-                ", collectionName='" + collectionName + '\'' +
-                '}';
+            "databaseName='" + databaseName + '\'' +
+            ", collectionName='" + collectionName + '\'' +
+            '}';
     }
 }
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index d92e968..6cdbe06 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -3,6 +3,13 @@ package org.apache.connect.mongo.initsync;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoIterable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.connect.mongo.replicator.ReplicaSet;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.ReplicaSetsContext;
@@ -13,14 +20,6 @@ import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
 public class InitSync {
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -34,7 +33,8 @@ public class InitSync {
     private CountDownLatch countDownLatch;
     private ReplicaSet replicaSet;
 
-    public InitSync(ReplicaSetConfig replicaSetConfig, MongoClient mongoClient, ReplicaSetsContext context, ReplicaSet replicaSet) {
+    public InitSync(ReplicaSetConfig replicaSetConfig, MongoClient mongoClient, ReplicaSetsContext context,
+        ReplicaSet replicaSet) {
         this.replicaSetConfig = replicaSetConfig;
         this.mongoClient = mongoClient;
         this.context = context;
@@ -89,7 +89,6 @@ public class InitSync {
 
     }
 
-
     class CopyRunner implements Runnable {
 
         private MongoClient mongoClient;
@@ -97,7 +96,8 @@ public class InitSync {
         private CollectionMeta collectionMeta;
         private ReplicaSet replicaSet;
 
-        public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, ReplicaSet replicaSet) {
+        public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta,
+            ReplicaSet replicaSet) {
             this.mongoClient = mongoClient;
             this.countDownLatch = countDownLatch;
             this.collectionMeta = collectionMeta;
@@ -110,10 +110,10 @@ public class InitSync {
             int count = 0;
             try {
                 MongoCursor<Document> mongoCursor = mongoClient.getDatabase(collectionMeta.getDatabaseName())
-                        .getCollection(collectionMeta.getCollectionName())
-                        .find()
-                        .batchSize(200)
-                        .iterator();
+                    .getCollection(collectionMeta.getCollectionName())
+                    .find()
+                    .batchSize(200)
+                    .iterator();
                 while (replicaSet.isRuning() && mongoCursor.hasNext()) {
                     if (context.initSyncAbort()) {
                         logger.info("init sync database:{}, collection:{} abort, has copy:{} document", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count);
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
index 1a91a57..c895bd6 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Constants.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
@@ -2,13 +2,9 @@ package org.apache.connect.mongo.replicator;
 
 public class Constants {
 
-
-
     public static final String MONGO_LOCAL_DATABASE = "local";
     public static final String MONGO_OPLOG_RS = "oplog.rs";
 
-
-
     public static final String OPERATIONTYPE = "op";
     public static final String TIMESTAMP = "ts";
     public static final String VERSION = "v";
@@ -17,11 +13,9 @@ public class Constants {
     public static final String OPERATION = "o";
     public static final String OBJECTID = "o2";
 
-
     public static final String CREATED = "created";
     public static final String PATCH = "patch";
 
-
     public static final String INITSYNC = "initSync";
 
 }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
index 3e431a3..fd26163 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
@@ -1,26 +1,23 @@
 package org.apache.connect.mongo.replicator;
 
-
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.connect.mongo.SourceTaskConfig;
 import org.apache.connect.mongo.initsync.CollectionMeta;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-
 public class Filter {
 
     private Function<CollectionMeta, Boolean> dbAndCollectionFilter;
     private Map<String, List<String>> interestMap = new HashMap<>();
     private Function<OperationType, Boolean> notNoopFilter;
 
-
     public Filter(SourceTaskConfig sourceTaskConfig) {
 
         String interestDbAndCollection = sourceTaskConfig.getInterestDbAndCollection();
@@ -33,7 +30,6 @@ public class Filter {
                 interestMap.put(db, collections);
             }
 
-
         }
 
         dbAndCollectionFilter = (collectionMeta) -> {
@@ -56,13 +52,12 @@ public class Filter {
         notNoopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal();
     }
 
-
     public boolean filterMeta(CollectionMeta collectionMeta) {
         return dbAndCollectionFilter.apply(collectionMeta);
     }
 
     public boolean filterEvent(ReplicationEvent event) {
         return dbAndCollectionFilter.apply(new CollectionMeta(event.getDatabaseName(), event.getCollectionName()))
-                && notNoopFilter.apply(event.getOperationType());
+            && notNoopFilter.apply(event.getOperationType());
     }
 }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
new file mode 100644
index 0000000..f5e01a3
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
@@ -0,0 +1,113 @@
+package org.apache.connect.mongo.replicator;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.SourceTaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoClientFactory {
+
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private SourceTaskConfig taskConfig;
+
+    public MongoClientFactory(SourceTaskConfig sourceTaskConfig) {
+        this.taskConfig = sourceTaskConfig;
+    }
+
+    public MongoClient createMongoClient(ReplicaSetConfig replicaSetConfig) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("mongodb://");
+        if (StringUtils.isNotBlank(taskConfig.getMongoUserName())
+            && StringUtils.isNotBlank(taskConfig.getMongoPassWord())) {
+            sb.append(taskConfig.getMongoUserName());
+            sb.append(":");
+            sb.append(taskConfig.getMongoPassWord());
+            sb.append("@");
+
+        }
+        sb.append(replicaSetConfig.getHost());
+        sb.append("/");
+        if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
+            sb.append("?");
+            sb.append("replicaSet=");
+            sb.append(replicaSetConfig.getReplicaSetName());
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getServerSelectionTimeoutMS())) {
+            sb.append("&");
+            sb.append("serverSelectionTimeoutMS=");
+            sb.append(taskConfig.getServerSelectionTimeoutMS());
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getConnectTimeoutMS())) {
+            sb.append("&");
+            sb.append("connectTimeoutMS=");
+            sb.append(taskConfig.getConnectTimeoutMS());
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getSocketTimeoutMS())) {
+            sb.append("&");
+            sb.append("socketTimeoutMS=");
+            sb.append(taskConfig.getSocketTimeoutMS());
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getSsl()) || StringUtils.isNotBlank(taskConfig.getTsl())) {
+            sb.append("&");
+            sb.append("ssl=");
+            sb.append(true);
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getTlsInsecure())) {
+            sb.append("&");
+            sb.append("tlsInsecure=");
+            sb.append(taskConfig.getTlsInsecure());
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getTlsAllowInvalidHostnames())) {
+            sb.append("&");
+            sb.append("tlsAllowInvalidHostnames=");
+            sb.append(taskConfig.getTlsAllowInvalidHostnames());
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getSslInvalidHostNameAllowed())) {
+            sb.append("&");
+            sb.append("sslInvalidHostNameAllowed=");
+            sb.append(taskConfig.getSslInvalidHostNameAllowed());
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getCompressors())) {
+            sb.append("&");
+            sb.append("compressors=");
+            sb.append(taskConfig.getCompressors());
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getZlibCompressionLevel())) {
+            sb.append("&");
+            sb.append("zlibcompressionlevel=");
+            sb.append(taskConfig.getZlibCompressionLevel());
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getTrustStore())) {
+            Properties properties = System.getProperties();
+            properties.put("javax.net.ssl.trustStore", taskConfig.getTrustStore());
+            logger.info("javax.net.ssl.trustStore: {}", taskConfig.getTrustStore());
+        }
+
+        if (StringUtils.isNotBlank(taskConfig.getTrustStorePassword())) {
+            Properties properties = System.getProperties();
+            properties.put("javax.net.ssl.trustStorePassword", taskConfig.getTrustStorePassword());
+            logger.info("javax.net.ssl.trustStorePassword: {}", taskConfig.getTrustStorePassword());
+        }
+
+        logger.info("connection string :{}", sb.toString());
+        System.out.println(sb.toString());
+        ConnectionString connectionString = new ConnectionString(sb.toString());
+        return MongoClients.create(connectionString);
+    }
+
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
index b141c2b..8f4d0d8 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
@@ -4,18 +4,16 @@ import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.MongoIterable;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.connect.mongo.replicator.Constants.MONGO_LOCAL_DATABASE;
 import static org.apache.connect.mongo.replicator.Constants.MONGO_OPLOG_RS;
 
-
 public class ReplicaSet {
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -24,7 +22,6 @@ public class ReplicaSet {
 
     private ReplicaSetConfig replicaSetConfig;
 
-
     private ReplicaSetsContext replicaSetsContext;
 
     private MongoClient mongoClient;
@@ -56,7 +53,6 @@ public class ReplicaSet {
         }
     }
 
-
     public boolean isReplicaMongo() {
         MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
         MongoIterable<String> collectionNames = local.listCollectionNames();
@@ -82,7 +78,6 @@ public class ReplicaSet {
 
     }
 
-
     public void pause() {
         pause = true;
     }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
index 4b8d148..1b54b17 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
@@ -1,18 +1,15 @@
 package org.apache.connect.mongo.replicator;
 
-import org.bson.BsonTimestamp;
-
 import java.util.Objects;
+import org.bson.BsonTimestamp;
 
 public class ReplicaSetConfig {
 
-
     private String shardName;
     private String replicaSetName;
     private String host;
     private Position position;
 
-
     public Position getPosition() {
         return position;
     }
@@ -55,13 +52,21 @@ public class ReplicaSetConfig {
         return new Position(0, 0, true);
     }
 
+    @Override
+    public String toString() {
+        return "ReplicaSetConfig{" +
+            "shardName='" + shardName + '\'' +
+            ", replicaSetName='" + replicaSetName + '\'' +
+            ", host='" + host + '\'' +
+            ", position=" + position +
+            '}';
+    }
 
     public class Position {
         private int timeStamp;
         private int inc;
         private boolean initSync;
 
-
         public int getTimeStamp() {
             return timeStamp;
         }
@@ -86,7 +91,6 @@ public class ReplicaSetConfig {
             this.initSync = initSync;
         }
 
-
         public Position(int timeStamp, int inc, boolean initSync) {
             this.timeStamp = timeStamp;
             this.inc = inc;
@@ -104,20 +108,22 @@ public class ReplicaSetConfig {
         @Override
         public String toString() {
             return "Position{" +
-                    "timeStamp=" + timeStamp +
-                    ", inc=" + inc +
-                    ", initSync=" + initSync +
-                    '}';
+                "timeStamp=" + timeStamp +
+                ", inc=" + inc +
+                ", initSync=" + initSync +
+                '}';
         }
 
         @Override
         public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
             Position position = (Position) o;
             return timeStamp == position.timeStamp &&
-                    inc == position.inc &&
-                    initSync == position.initSync;
+                inc == position.inc &&
+                initSync == position.initSync;
         }
 
         @Override
@@ -125,15 +131,4 @@ public class ReplicaSetConfig {
             return Objects.hash(timeStamp, inc, initSync);
         }
     }
-
-
-    @Override
-    public String toString() {
-        return "ReplicaSetConfig{" +
-                "shardName='" + shardName + '\'' +
-                ", replicaSetName='" + replicaSetName + '\'' +
-                ", host='" + host + '\'' +
-                ", position=" + position +
-                '}';
-    }
 }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
index 9184b90..af1ebeb 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
@@ -1,23 +1,20 @@
 package org.apache.connect.mongo.replicator;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.Validate;
-
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
 
 public class ReplicaSets {
 
     private static final Pattern HOST_PATTERN = Pattern.compile("((([^=]+)[=])?(([^/]+)\\/))?(.+)");
 
-
     private final Map<String, ReplicaSetConfig> replicaConfigByName = new HashMap<>();
 
-
     public ReplicaSets(Set<ReplicaSetConfig> replicaSetConfigs) {
         replicaSetConfigs.forEach(replicaSetConfig -> {
             if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
@@ -43,7 +40,6 @@ public class ReplicaSets {
         return new ReplicaSets(replicaSetConfigs);
     }
 
-
     private static ReplicaSetConfig parseReplicaSetStr(String hosts) {
         if (hosts != null) {
             Matcher matcher = HOST_PATTERN.matcher(hosts);
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
index f66ca14..e599f5b 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
@@ -1,15 +1,7 @@
 package org.apache.connect.mongo.replicator;
 
-import com.mongodb.ConnectionString;
 import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
 import io.openmessaging.connector.api.data.SourceDataEntry;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.connect.mongo.SourceTaskConfig;
-import org.apache.connect.mongo.connector.builder.MongoDataEntry;
-import org.apache.connect.mongo.initsync.CollectionMeta;
-import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -17,6 +9,10 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.connect.mongo.SourceTaskConfig;
+import org.apache.connect.mongo.connector.builder.MongoDataEntry;
+import org.apache.connect.mongo.initsync.CollectionMeta;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 
 public class ReplicaSetsContext {
 
@@ -30,57 +26,36 @@ public class ReplicaSetsContext {
 
     private Filter filter;
 
+    private MongoClientFactory mongoClientFactory;
+
     public ReplicaSetsContext(SourceTaskConfig taskConfig) {
         this.taskConfig = taskConfig;
         this.replicaSets = new CopyOnWriteArrayList<>();
         this.dataEntryQueue = new LinkedBlockingDeque<>();
         this.filter = new Filter(taskConfig);
+        this.mongoClientFactory = new MongoClientFactory(taskConfig);
     }
 
-
     public MongoClient createMongoClient(ReplicaSetConfig replicaSetConfig) {
-        StringBuilder sb = new StringBuilder();
-        sb.append("mongodb://");
-        if (StringUtils.isNotBlank(taskConfig.getMongoUserName())
-                && StringUtils.isNotBlank(taskConfig.getMongoPassWord())) {
-            sb.append(taskConfig.getMongoUserName());
-            sb.append(":");
-            sb.append(taskConfig.getMongoPassWord());
-            sb.append("@");
-
-        }
-        sb.append(replicaSetConfig.getHost());
-        sb.append("/");
-        if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
-            sb.append("?");
-            sb.append("replicaSet=");
-            sb.append(replicaSetConfig.getReplicaSetName());
-        }
-        ConnectionString connectionString = new ConnectionString(sb.toString());
-        return MongoClients.create(connectionString);
+        return mongoClientFactory.createMongoClient(replicaSetConfig);
     }
 
-
     public boolean filterEvent(ReplicationEvent event) {
         return filter.filterEvent(event);
     }
 
-
     public boolean filterMeta(CollectionMeta collectionMeta) {
         return filter.filterMeta(collectionMeta);
     }
 
-
     public int getCopyThread() {
         return taskConfig.getCopyThread() > 0 ? taskConfig.getCopyThread() : Runtime.getRuntime().availableProcessors();
     }
 
-
     public void addReplicaSet(ReplicaSet replicaSet) {
         this.replicaSets.add(replicaSet);
     }
 
-
     public void shutdown() {
         replicaSets.forEach(ReplicaSet::shutdown);
     }
@@ -89,12 +64,10 @@ public class ReplicaSetsContext {
         replicaSets.forEach(ReplicaSet::pause);
     }
 
-
     public void resume() {
         replicaSets.forEach(ReplicaSet::resume);
     }
 
-
     public void publishEvent(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
         SourceDataEntry sourceDataEntry = MongoDataEntry.createSouceDataEntry(event, replicaSetConfig);
         while (true) {
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index bf4ebac..6cb46d1 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -13,7 +13,6 @@ import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class ReplicatorTask implements Runnable {
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -26,7 +25,8 @@ public class ReplicatorTask implements Runnable {
 
     private ReplicaSetsContext replicaSetsContext;
 
-    public ReplicatorTask(ReplicaSet replicaSet, MongoClient mongoClient, ReplicaSetConfig replicaSetConfig, ReplicaSetsContext replicaSetsContext) {
+    public ReplicatorTask(ReplicaSet replicaSet, MongoClient mongoClient, ReplicaSetConfig replicaSetConfig,
+        ReplicaSetsContext replicaSetsContext) {
         this.replicaSet = replicaSet;
         this.replicaSetConfig = replicaSetConfig;
         this.mongoClient = mongoClient;
@@ -45,15 +45,15 @@ public class ReplicatorTask implements Runnable {
         FindIterable<Document> iterable;
         if (replicaSetConfig.getPosition().isValid()) {
             iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
-                    Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
+                Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
         } else {
             iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
         }
         MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1))
-                .noCursorTimeout(true)
-                .cursorType(CursorType.TailableAwait)
-                .batchSize(200)
-                .iterator();
+            .noCursorTimeout(true)
+            .cursorType(CursorType.TailableAwait)
+            .batchSize(200)
+            .iterator();
 
         while (replicaSet.isRuning()) {
             try {
@@ -70,7 +70,6 @@ public class ReplicatorTask implements Runnable {
         logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig);
     }
 
-
     private void executorCursor(MongoCursor<Document> cursor) {
         while (cursor.hasNext() && !replicaSet.isPause()) {
             Document document = cursor.next();
@@ -81,5 +80,4 @@ public class ReplicatorTask implements Runnable {
         }
     }
 
-
 }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
index 57b4ac8..1b48990 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
@@ -1,16 +1,19 @@
 package org.apache.connect.mongo.replicator.event;
 
+import java.util.Optional;
 import org.bson.BsonTimestamp;
 import org.bson.Document;
 
-import java.util.Optional;
-
-import static org.apache.connect.mongo.replicator.Constants.*;
-
+import static org.apache.connect.mongo.replicator.Constants.HASH;
+import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
+import static org.apache.connect.mongo.replicator.Constants.OBJECTID;
+import static org.apache.connect.mongo.replicator.Constants.OPERATION;
+import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE;
+import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
+import static org.apache.connect.mongo.replicator.Constants.VERSION;
 
 public class EventConverter {
 
-
     public static ReplicationEvent convert(Document document, String replicaSetName) {
 
         ReplicationEvent event = new ReplicationEvent();
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
index 283e9d6..6407781 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
@@ -1,12 +1,11 @@
 package org.apache.connect.mongo.replicator.event;
 
 import io.openmessaging.connector.api.data.EntryType;
+import java.util.Optional;
 import org.apache.commons.lang3.StringUtils;
 import org.bson.BsonTimestamp;
 import org.bson.Document;
 
-import java.util.Optional;
-
 public class ReplicationEvent {
 
     private Document document;
@@ -21,13 +20,12 @@ public class ReplicationEvent {
     private Optional<Document> objectId;
     private String replicaSetName;
 
-
     public ReplicationEvent() {
 
     }
 
-
-    public ReplicationEvent(OperationType operationType, BsonTimestamp timestamp, Integer v, Long h, String namespace, Optional<Document> eventData, Optional<Document> objectId, Document document) {
+    public ReplicationEvent(OperationType operationType, BsonTimestamp timestamp, Integer v, Long h, String namespace,
+        Optional<Document> eventData, Optional<Document> objectId, Document document) {
         this.operationType = operationType;
         this.v = v;
         this.h = h;
@@ -41,7 +39,6 @@ public class ReplicationEvent {
         this.document = document;
     }
 
-
     public OperationType getOperationType() {
         return operationType;
     }
@@ -91,12 +88,10 @@ public class ReplicationEvent {
         }
     }
 
-
     public void setOperationType(OperationType operationType) {
         this.operationType = operationType;
     }
 
-
     public Document getDocument() {
         return document;
     }
@@ -137,7 +132,6 @@ public class ReplicationEvent {
         this.objectId = objectId;
     }
 
-
     public void setReplicaSetName(String replicaSetName) {
         this.replicaSetName = replicaSetName;
     }
@@ -149,17 +143,17 @@ public class ReplicationEvent {
     @Override
     public String toString() {
         return "ReplicationEvent{" +
-                "document=" + document +
-                ", operationType=" + operationType +
-                ", v=" + v +
-                ", h=" + h +
-                ", timestamp=" + timestamp +
-                ", databaseName='" + databaseName + '\'' +
-                ", collectionName='" + collectionName + '\'' +
-                ", namespace='" + namespace + '\'' +
-                ", eventData=" + eventData +
-                ", objectId=" + objectId +
-                ", replicaSetName='" + replicaSetName + '\'' +
-                '}';
+            "document=" + document +
+            ", operationType=" + operationType +
+            ", v=" + v +
+            ", h=" + h +
+            ", timestamp=" + timestamp +
+            ", databaseName='" + databaseName + '\'' +
+            ", collectionName='" + collectionName + '\'' +
+            ", namespace='" + namespace + '\'' +
+            ", eventData=" + eventData +
+            ", objectId=" + objectId +
+            ", replicaSetName='" + replicaSetName + '\'' +
+            '}';
     }
 }
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
index 60e2514..a804b8b 100644
--- a/src/test/java/org/apache/connect/mongo/FilterTest.java
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -1,6 +1,10 @@
 package org.apache.connect.mongo;
 
 import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.connect.mongo.initsync.CollectionMeta;
 import org.apache.connect.mongo.replicator.Filter;
 import org.apache.connect.mongo.replicator.event.OperationType;
@@ -9,14 +13,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class FilterTest {
 
-
     private SourceTaskConfig sourceTaskConfig;
     private Map<String, List<String>> insterest;
 
@@ -37,7 +35,6 @@ public class FilterTest {
         Assert.assertFalse(filter.filterMeta(new CollectionMeta("test", "person01")));
     }
 
-
     @Test
     public void testBlankDb() {
         Filter filter = new Filter(sourceTaskConfig);
@@ -45,7 +42,6 @@ public class FilterTest {
         Assert.assertTrue(filter.filterMeta(new CollectionMeta("test1", "test01")));
     }
 
-
     @Test
     public void testAsterisk() {
         List<String> collections = new ArrayList<>();
@@ -57,8 +53,6 @@ public class FilterTest {
         Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "tests032")));
     }
 
-
-
     @Test
     public void testFilterEvent() {
         Filter filter = new Filter(sourceTaskConfig);
@@ -69,5 +63,4 @@ public class FilterTest {
         Assert.assertTrue(filter.filterEvent(replicationEvent));
     }
 
-
 }
diff --git a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
new file mode 100644
index 0000000..93adeeb
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
@@ -0,0 +1,173 @@
+package org.apache.connect.mongo;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoTimeoutException;
+import com.mongodb.client.internal.MongoClientImpl;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.replicator.MongoClientFactory;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MongoFactoryTest {
+
+    private ReplicaSetConfig replicaSetConfig;
+
+    private SourceTaskConfig sourceTaskConfig;
+
+    private MongoClientFactory mongoClientFactory;
+
+    private MongoClientImpl client;
+
+    @Before
+    public void before() {
+        this.replicaSetConfig = new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27027");
+        this.sourceTaskConfig = new SourceTaskConfig();
+        this.mongoClientFactory = new MongoClientFactory(sourceTaskConfig);
+    }
+
+    @After
+    public void after() {
+        client.close();
+    }
+
+    @Test
+    public void testCreateMongoClientWithSSL() {
+        sourceTaskConfig.setSsl("ssl");
+        MongoClientSettings settings = getSettings();
+        System.out.println(settings.getSslSettings());
+        Assert.assertTrue(settings.getSslSettings().isEnabled());
+    }
+
+    @Test
+    public void testCreateMongoClientWithTSL() {
+        sourceTaskConfig.setTsl("tsl");
+        MongoClientSettings settings = getSettings();
+        System.out.println(settings.getSslSettings());
+        Assert.assertTrue(settings.getSslSettings().isEnabled());
+    }
+
+    @Test
+    public void testCreateMongoClientWithserverSelectionTimeoutMS() {
+        try {
+            replicaSetConfig.setReplicaSetName("testReplicatSet");
+            sourceTaskConfig.setServerSelectionTimeoutMS("150");
+            System.out.println(getSettings().getClusterSettings());
+            Assert.assertTrue(getSettings().getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS) == 150);
+        } catch (MongoTimeoutException exception) {
+            Assert.assertTrue(StringUtils.startsWith(exception.getMessage(), "Timed out after 100 ms while waiting for a server that matches"));
+        }
+    }
+
+    @Test
+    public void testCreateMongoClientWithConnectTimeoutMS() {
+        sourceTaskConfig.setConnectTimeoutMS("1200");
+        System.out.println(getSettings().getSocketSettings());
+        Assert.assertTrue(getSettings().getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS) == 1200);
+
+    }
+
+    @Test
+    public void testCreateMongoClientWithSocketTimeoutMS() {
+        sourceTaskConfig.setSocketTimeoutMS("1100");
+        System.out.println(getSettings().getSocketSettings());
+        Assert.assertTrue(getSettings().getSocketSettings().getReadTimeout(TimeUnit.MILLISECONDS) == 1100);
+    }
+
+    @Test
+    public void testCreateMongoClientWithInvalidHostNameAllowed() {
+        sourceTaskConfig.setSslInvalidHostNameAllowed("true");
+        System.out.println(getSettings().getSslSettings());
+        Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
+
+        sourceTaskConfig.setSslInvalidHostNameAllowed("false");
+        System.out.println(getSettings().getSslSettings());
+        Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
+    }
+
+    @Test
+    public void testCreateMongoClientWithInvalidHostNameAllowedTsl() {
+        sourceTaskConfig.setTlsAllowInvalidHostnames("true");
+        System.out.println(getSettings().getSslSettings());
+        Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
+
+        sourceTaskConfig.setTlsAllowInvalidHostnames("false");
+        System.out.println(getSettings().getSslSettings());
+        Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
+    }
+
+    @Test
+    public void testCreateMongoClientWithTlsinsecure() {
+        sourceTaskConfig.setTlsInsecure("true");
+        System.out.println(getSettings().getSslSettings());
+        Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
+
+        sourceTaskConfig.setTlsInsecure("false");
+        System.out.println(getSettings().getSslSettings());
+        Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
+    }
+
+    @Test
+    public void testCreateMongoClientWithCompression() {
+        sourceTaskConfig.setCompressors("zlib");
+        System.out.println(getSettings().getCompressorList());
+        Assert.assertTrue(getSettings().getCompressorList().get(0).getName().equals("zlib"));
+    }
+
+    @Test
+    public void testCreateMongoClientWithCompressionLevel() {
+        sourceTaskConfig.setCompressors("zlib");
+        sourceTaskConfig.setZlibCompressionLevel("7");
+        System.out.println(getSettings().getCompressorList());
+        Assert.assertTrue(getSettings().getCompressorList().get(0).getName().equals("zlib"));
+        Assert.assertTrue(getSettings().getCompressorList().get(0).getProperty("level", 0) == 7);
+    }
+
+    @Test
+    public void testCreateMongoClientWithAuth() {
+        sourceTaskConfig.setMongoUserName("test");
+        sourceTaskConfig.setMongoPassWord("123456");
+        System.out.println(getSettings().getCredential());
+        Assert.assertTrue(getSettings().getCredential().getSource().equals("admin"));
+        Assert.assertTrue(getSettings().getCredential().getUserName().equals("test"));
+        Assert.assertTrue(new String(getSettings().getCredential().getPassword()).equals("123456"));
+    }
+
+    private MongoClientSettings getSettings() {
+        try {
+            client = (MongoClientImpl) mongoClientFactory.createMongoClient(replicaSetConfig);
+            Field field = MongoClientImpl.class.getDeclaredField("settings");
+            field.setAccessible(true);
+            return (MongoClientSettings) field.get(client);
+        } catch (Exception e) {
+
+        }
+        return null;
+    }
+
+//    @Test
+//    public void testSSLTrustStore() {
+//        sourceTaskConfig.setMongoUserName("user_test");
+//        sourceTaskConfig.setMongoPassWord("pwd_test");
+//        sourceTaskConfig.setSsl("ssl");
+//        sourceTaskConfig.setSslInvalidHostNameAllowed("true");
+//        sourceTaskConfig.setTrustStore("/Users/liping/test.pem");
+//        sourceTaskConfig.setTrustStorePassword("test001");
+//        sourceTaskConfig.setServerSelectionTimeoutMS("10000");
+//        MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig);
+//        MongoCollection<Document> collection = client.getDatabase("test").getCollection("person");
+//        Document document = new Document();
+//        document.put("name", "liping");
+//        collection.insertOne(document);
+//        MongoCursor<Document> iterator = collection.find().iterator();
+//        while (iterator.hasNext()) {
+//            System.out.println(iterator.next());
+//        }
+//
+//    }
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
index 1f6227b..f85e4a9 100644
--- a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
@@ -5,6 +5,11 @@ import io.openmessaging.connector.api.data.EntryType;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.internal.DefaultKeyValue;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.connect.mongo.connector.MongoSourceConnector;
 import org.apache.connect.mongo.connector.MongoSourceTask;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
@@ -17,12 +22,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.LinkedBlockingQueue;
-
 public class MongoSourceConnectorTest {
 
     private MongoSourceConnector mongoSourceConnector;
@@ -42,14 +41,12 @@ public class MongoSourceConnectorTest {
         Assert.assertEquals(mongoSourceConnector.taskClass(), MongoSourceTask.class);
     }
 
-
     @Test
     public void verifyConfig() {
         String s = mongoSourceConnector.verifyAndSetConfig(keyValue);
         Assert.assertTrue(s.contains("Request config key:"));
     }
 
-
     @Test
     public void testPoll() throws Exception {
         LinkedBlockingQueue<SourceDataEntry> entries = new LinkedBlockingQueue<>();
@@ -65,25 +62,22 @@ public class MongoSourceConnectorTest {
         event.setH(324243242L);
         event.setEventData(Optional.ofNullable(new Document("testEventKey", "testEventValue")));
         event.setObjectId(Optional.empty());
-        context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27017"));
+        context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27027"));
         List<SourceDataEntry> sourceDataEntries = (List<SourceDataEntry>) context.poll();
         Assert.assertTrue(sourceDataEntries.size() == 1);
 
         SourceDataEntry sourceDataEntry = sourceDataEntries.get(0);
         Assert.assertEquals("test-person", sourceDataEntry.getQueueName());
 
-
         ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
         Assert.assertEquals("testReplicaName", new String(sourcePartition.array()));
 
-
         ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
         ReplicaSetConfig.Position position = JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class);
         Assert.assertEquals(position.getTimeStamp(), 1565609506);
         Assert.assertEquals(position.getInc(), 1);
         Assert.assertEquals(position.isInitSync(), false);
 
-
         EntryType entryType = sourceDataEntry.getEntryType();
         Assert.assertEquals(EntryType.CREATE, entryType);
 
@@ -97,5 +91,4 @@ public class MongoSourceConnectorTest {
 
     }
 
-
 }
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
new file mode 100644
index 0000000..b696393
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
@@ -0,0 +1,143 @@
+package org.apache.connect.mongo;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.PositionStorageReader;
+import io.openmessaging.connector.api.source.SourceTask;
+import io.openmessaging.connector.api.source.SourceTaskContext;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.connector.MongoSourceTask;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.ReplicaSet;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MongoSourceTaskTest {
+
+    @Test
+    public void testEmptyContextStart() throws NoSuchFieldException, IllegalAccessException {
+        MongoSourceTask mongoSourceTask = new MongoSourceTask();
+        DefaultKeyValue defaultKeyValue = new DefaultKeyValue();
+        defaultKeyValue.put("mongoAddr", "test/127.0.0.1:27027");
+        defaultKeyValue.put("positionTimeStamp", "11111111");
+        defaultKeyValue.put("positionInc", "111");
+        defaultKeyValue.put("serverSelectionTimeoutMS", "10");
+        defaultKeyValue.put("dataSync", Constants.INITSYNC);
+
+        Field context = SourceTask.class.getDeclaredField("context");
+        context.setAccessible(true);
+        context.set(mongoSourceTask, emptyTaskContext());
+        mongoSourceTask.start(defaultKeyValue);
+
+        Field replicaSetsContext = MongoSourceTask.class.getDeclaredField("replicaSetsContext");
+        replicaSetsContext.setAccessible(true);
+        ReplicaSetsContext setsContext = (ReplicaSetsContext) replicaSetsContext.get(mongoSourceTask);
+
+        Field replicaSets = ReplicaSetsContext.class.getDeclaredField("replicaSets");
+        replicaSets.setAccessible(true);
+        List<ReplicaSet> replicaSetList = (List<ReplicaSet>) replicaSets.get(setsContext);
+        Assert.assertTrue(replicaSetList.size() == 1);
+        ReplicaSet replicaSet = replicaSetList.get(0);
+        Field replicaSetConfig = ReplicaSet.class.getDeclaredField("replicaSetConfig");
+        replicaSetConfig.setAccessible(true);
+        ReplicaSetConfig replicaSetConfig1 = (ReplicaSetConfig) replicaSetConfig.get(replicaSet);
+        Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getReplicaSetName(), "test"));
+        Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27027"));
+        Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 11111111);
+        Assert.assertTrue(replicaSetConfig1.getPosition().getInc() == 111);
+        Assert.assertTrue(replicaSetConfig1.getPosition().isInitSync());
+    }
+
+    private SourceTaskContext emptyTaskContext() {
+        return new SourceTaskContext() {
+            @Override
+            public PositionStorageReader positionStorageReader() {
+                return new PositionStorageReader() {
+                    @Override
+                    public ByteBuffer getPosition(ByteBuffer partition) {
+                        return null;
+                    }
+
+                    @Override
+                    public Map<ByteBuffer, ByteBuffer> getPositions(Collection<ByteBuffer> partitions) {
+                        return null;
+                    }
+                };
+            }
+
+            @Override
+            public KeyValue configs() {
+                return null;
+            }
+        };
+    }
+
+    @Test
+    public void testContextStart() throws NoSuchFieldException, IllegalAccessException {
+        MongoSourceTask mongoSourceTask = new MongoSourceTask();
+        DefaultKeyValue defaultKeyValue = new DefaultKeyValue();
+        defaultKeyValue.put("mongoAddr", "test/127.0.0.1:27027");
+        defaultKeyValue.put("serverSelectionTimeoutMS", "10");
+
+        Field context = SourceTask.class.getDeclaredField("context");
+        context.setAccessible(true);
+        context.set(mongoSourceTask, TaskContext());
+        mongoSourceTask.start(defaultKeyValue);
+
+        Field replicaSetsContext = MongoSourceTask.class.getDeclaredField("replicaSetsContext");
+        replicaSetsContext.setAccessible(true);
+        ReplicaSetsContext setsContext = (ReplicaSetsContext) replicaSetsContext.get(mongoSourceTask);
+
+        Field replicaSets = ReplicaSetsContext.class.getDeclaredField("replicaSets");
+        replicaSets.setAccessible(true);
+        List<ReplicaSet> replicaSetList = (List<ReplicaSet>) replicaSets.get(setsContext);
+        Assert.assertTrue(replicaSetList.size() == 1);
+        ReplicaSet replicaSet = replicaSetList.get(0);
+        Field replicaSetConfig = ReplicaSet.class.getDeclaredField("replicaSetConfig");
+        replicaSetConfig.setAccessible(true);
+        ReplicaSetConfig replicaSetConfig1 = (ReplicaSetConfig) replicaSetConfig.get(replicaSet);
+        Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getReplicaSetName(), "test"));
+        Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27027"));
+        Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 22222222);
+        Assert.assertTrue(replicaSetConfig1.getPosition().getInc() == 222);
+        Assert.assertTrue(!replicaSetConfig1.getPosition().isInitSync());
+    }
+
+    private SourceTaskContext TaskContext() {
+        return new SourceTaskContext() {
+            @Override
+            public PositionStorageReader positionStorageReader() {
+                return new PositionStorageReader() {
+                    @Override
+                    public ByteBuffer getPosition(ByteBuffer partition) {
+
+                        Map<String, Object> po = new HashMap<>();
+                        po.put("timeStamp", 22222222);
+                        po.put("inc", 222);
+                        po.put("initSync", false);
+                        return ByteBuffer.wrap(JSONObject.toJSONString(po).getBytes());
+                    }
+
+                    @Override
+                    public Map<ByteBuffer, ByteBuffer> getPositions(Collection<ByteBuffer> partitions) {
+                        return null;
+                    }
+                };
+            }
+
+            @Override
+            public KeyValue configs() {
+                return null;
+            }
+        };
+    }
+}
diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java
index 7b0291a..98e9a42 100644
--- a/src/test/java/org/apache/connect/mongo/MongoTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -9,6 +9,14 @@ import com.mongodb.client.MongoCollection;
 import io.openmessaging.connector.api.data.EntryType;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.connector.api.data.SourceDataEntry;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.connect.mongo.initsync.InitSync;
 import org.apache.connect.mongo.replicator.Constants;
 import org.apache.connect.mongo.replicator.ReplicaSet;
@@ -23,11 +31,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class MongoTest {
 
     private MongoClient mongoClient;
@@ -35,11 +38,10 @@ public class MongoTest {
     @Before
     public void before() {
         MongoClientSettings.Builder builder = MongoClientSettings.builder();
-        builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077"));
+        builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27027"));
         mongoClient = MongoClients.create(builder.build());
     }
 
-
     @Test
     public void testConvertEvent() {
         Document oplog = new Document();
@@ -60,10 +62,8 @@ public class MongoTest {
         Assert.assertEquals(document, event.getEventData().get());
         Assert.assertEquals("testR", event.getReplicaSetName());
 
-
     }
 
-
     @Test
     public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
         MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person");
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
index 8613c42..16cb959 100644
--- a/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
+++ b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
@@ -20,10 +20,9 @@ public class ReplicaContextTest {
         context = new ReplicaSetsContext(sourceTaskConfig);
     }
 
-
     @Test
     public void testCreateMongoClient() {
-        MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27017"));
+        MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "", "127.0.0.1:27027"));
         MongoIterable<String> collectionNames = mongoClient.getDatabase("local").listCollectionNames();
         MongoCursor<String> iterator = collectionNames.iterator();
         while (iterator.hasNext()) {
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java
new file mode 100644
index 0000000..07eefae
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java
@@ -0,0 +1,59 @@
+package org.apache.connect.mongo;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.connect.mongo.replicator.ReplicaSet;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicaSetTest {
+
+    private ReplicaSet replicaSet;
+
+    private SourceTaskConfig sourceTaskConfig;
+
+    private ReplicaSetConfig replicaSetConfig;
+
+    private ReplicaSetsContext replicaSetsContext;
+
+    @Before
+    public void before() {
+        this.sourceTaskConfig = new SourceTaskConfig();
+        this.replicaSetConfig = new ReplicaSetConfig("shardName1", "", "127.0.0.1:27027");
+        this.replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
+        this.replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
+    }
+
+    @Test
+    public void testStartAndShutDown() throws NoSuchFieldException, IllegalAccessException {
+        replicaSet.start();
+        Field field = ReplicaSet.class.getDeclaredField("running");
+        field.setAccessible(true);
+        AtomicBoolean o = (AtomicBoolean) field.get(replicaSet);
+        Assert.assertTrue(o.get());
+        replicaSet.shutdown();
+        Assert.assertFalse(o.get());
+    }
+
+    @Test
+    public void testPause() throws Exception {
+        replicaSet.pause();
+        Field field = ReplicaSet.class.getDeclaredField("pause");
+        field.setAccessible(true);
+        boolean pause = (boolean) field.get(replicaSet);
+        Assert.assertTrue(pause);
+    }
+
+    @Test
+    public void testResume() throws Exception {
+        replicaSet.resume();
+        Field field = ReplicaSet.class.getDeclaredField("pause");
+        field.setAccessible(true);
+        boolean pause = (boolean) field.get(replicaSet);
+        Assert.assertFalse(pause);
+    }
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
index e69eac6..5276f4f 100644
--- a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
+++ b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
@@ -1,29 +1,25 @@
 package org.apache.connect.mongo;
 
+import java.util.Map;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.ReplicaSets;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Map;
-
 public class ReplicaSetsTest {
 
-
     @Test(expected = IllegalArgumentException.class)
-    public void testCreatReplicaSetsException01() {
+    public void testCreatReplicaSetsExceptionWithoutMongoAddr() {
         ReplicaSets.create("");
     }
 
-
     @Test(expected = IllegalArgumentException.class)
-    public void testCreatReplicaSetsException02() {
+    public void testCreatReplicaSetsExceptioWithoutReplicaSetName() {
         ReplicaSets.create("127.0.0.1:27081");
     }
 
-
     @Test
-    public void testCreatReplicaSets01() {
+    public void testCreatReplicaSetsSpecialReplicaSetName() {
         ReplicaSets replicaSets = ReplicaSets.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
         Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
         Assert.assertTrue(replicaSetConfigMap.size() == 1);
@@ -32,9 +28,8 @@ public class ReplicaSetsTest {
         Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName());
     }
 
-
     @Test
-    public void testCreatReplicaSets02() {
+    public void testCreatReplicaSetsSpecialShardNameAndReplicaSetName() {
         ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
         Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
         Assert.assertTrue(replicaSetConfigMap.size() == 1);
@@ -44,9 +39,8 @@ public class ReplicaSetsTest {
         Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName());
     }
 
-
     @Test
-    public void testCreatReplicaSets03() {
+    public void testCreatReplicaSetsMutiMongoAddr() {
         ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283");
         Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
         Assert.assertTrue(replicaSetConfigMap.size() == 2);
@@ -55,7 +49,6 @@ public class ReplicaSetsTest {
         Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName());
         Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName());
 
-
         Assert.assertNotNull(replicaSetConfigMap.get("replicaName2"));
         Assert.assertEquals("127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283", replicaSetConfigMap.get("replicaName2").getHost());
         Assert.assertEquals("replicaName2", replicaSetConfigMap.get("replicaName2").getReplicaSetName());