You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:30 UTC

[rocketmq-connect] 07/13: support multiple mongo replicaset

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit f95bf2257d3bb9c57cfb2194876e57af45c5023e
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Tue Aug 13 15:59:38 2019 +0800

    support multiple mongo replicaset
---
 ...ReplicatorConfig.java => SourceTaskConfig.java} |  35 ++---
 .../mongo/connector/MongoSourceConnector.java      |   4 +-
 .../connect/mongo/connector/MongoSourceTask.java   | 146 +++++++-------------
 .../mongo/connector/builder/MongoDataEntry.java    | 113 +++++++++++++++
 .../apache/connect/mongo/initsync/InitSync.java    |  61 ++++----
 .../apache/connect/mongo/replicator/Constants.java |   6 -
 .../apache/connect/mongo/replicator/Filter.java    |   8 +-
 .../connect/mongo/replicator/MongoReplicator.java  | 153 ---------------------
 .../connect/mongo/replicator/ReplicaSet.java       | 101 ++++++++++++++
 .../connect/mongo/replicator/ReplicaSetConfig.java | 139 +++++++++++++++++++
 .../connect/mongo/replicator/ReplicaSets.java      |  70 ++++++++++
 .../mongo/replicator/ReplicaSetsContext.java       | 129 +++++++++++++++++
 .../connect/mongo/replicator/ReplicatorTask.java   |  45 +++---
 .../replicator/event/DocumentConvertEvent.java     |  29 ----
 .../mongo/replicator/event/EventConverter.java     |  29 ++++
 .../mongo/replicator/event/ReplicationEvent.java   |  18 ++-
 .../java/org/apache/connect/mongo/FilterTest.java  |  28 ++--
 .../connect/mongo/MongoSourceConnectorTest.java    |  69 +++++++++-
 .../java/org/apache/connect/mongo/MongoTest.java   |  78 +++++++----
 .../apache/connect/mongo/ReplicaContextTest.java   |  34 +++++
 .../org/apache/connect/mongo/ReplicaSetsTest.java  |  65 +++++++++
 21 files changed, 946 insertions(+), 414 deletions(-)

diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
similarity index 83%
rename from src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
rename to src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
index 9097640..b79b2e9 100644
--- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
+++ b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
@@ -1,37 +1,37 @@
 package org.apache.connect.mongo;
 
 import io.openmessaging.KeyValue;
-import org.apache.commons.lang3.StringUtils;
 import org.bson.BsonTimestamp;
 
 import java.lang.reflect.Method;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-public class MongoReplicatorConfig {
+public class SourceTaskConfig {
 
     private String replicaSet;
     private String mongoAddr;
     private String mongoUserName;
     private String mongoPassWord;
     private String interestDbAndCollection;
-    private int positionTimeStamp;
-    private int positionInc;
-    private boolean dataSync;
+    private String positionTimeStamp;
+    private String positionInc;
+    private String dataSync;
     private int copyThread = Runtime.getRuntime().availableProcessors();
 
 
-    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+    public static final Set<String> REQUEST_CONFIG = Collections.unmodifiableSet(new HashSet<String>() {
         {
             add("mongoAddr");
         }
-    };
+    });
 
-    public int getPositionInc() {
+    public String getPositionInc() {
         return positionInc;
     }
 
-    public void setPositionInc(int positionInc) {
+    public void setPositionInc(String positionInc) {
         this.positionInc = positionInc;
     }
 
@@ -43,11 +43,11 @@ public class MongoReplicatorConfig {
         this.copyThread = copyThread;
     }
 
-    public int getPositionTimeStamp() {
+    public String getPositionTimeStamp() {
         return positionTimeStamp;
     }
 
-    public void setPositionTimeStamp(int positionTimeStamp) {
+    public void setPositionTimeStamp(String positionTimeStamp) {
         this.positionTimeStamp = positionTimeStamp;
     }
 
@@ -85,11 +85,11 @@ public class MongoReplicatorConfig {
     }
 
 
-    public boolean getDataSync() {
+    public String getDataSync() {
         return dataSync;
     }
 
-    public void setDataSync(boolean dataSync) {
+    public void setDataSync(String dataSync) {
         this.dataSync = dataSync;
     }
 
@@ -148,16 +148,9 @@ public class MongoReplicatorConfig {
         }
     }
 
-    public String getDataSouce() {
-        if (StringUtils.isBlank(replicaSet)) {
-            return mongoAddr;
-        }
-        return replicaSet + ":" + mongoAddr;
-    }
-
 
     public BsonTimestamp getPosition() {
-        return new BsonTimestamp(positionTimeStamp, positionInc);
+        return new BsonTimestamp(Integer.valueOf(positionTimeStamp), Integer.valueOf(positionInc));
     }
 
 }
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
index 9e659c9..2b28ea2 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
@@ -3,7 +3,7 @@ package org.apache.connect.mongo.connector;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
-import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.SourceTaskConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -18,7 +18,7 @@ public class MongoSourceConnector extends SourceConnector {
 
     @Override
     public String verifyAndSetConfig(KeyValue config) {
-        for (String requestKey : MongoReplicatorConfig.REQUEST_CONFIG) {
+        for (String requestKey : SourceTaskConfig.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
                 return "Request config key: " + requestKey;
             }
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index 7272878..407608a 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -2,91 +2,71 @@ package org.apache.connect.mongo.connector;
 
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.connector.api.source.SourceTask;
-import org.apache.connect.mongo.MongoReplicatorConfig;
-import org.apache.connect.mongo.replicator.Constants;
-import org.apache.connect.mongo.replicator.MongoReplicator;
-import org.apache.connect.mongo.replicator.event.OperationType;
-import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.SourceTaskConfig;
+import org.apache.connect.mongo.replicator.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
+import java.util.regex.Pattern;
 
 public class MongoSourceTask extends SourceTask {
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    private MongoReplicator mongoReplicator;
+    private SourceTaskConfig sourceTaskConfig;
 
-    private MongoReplicatorConfig replicatorConfig;
+    private ReplicaSets replicaSets;
 
-    private String mongoSource;
+    private ReplicaSetsContext replicaSetsContext;
+
+    private Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
 
     @Override
     public Collection<SourceDataEntry> poll() {
-        List<SourceDataEntry> res = new ArrayList<>();
-        ReplicationEvent event = mongoReplicator.getQueue().poll();
-        if (event == null) {
-            return new ArrayList<>();
-        }
 
-        JSONObject position = position(event);
-        Schema schema = new Schema();
-        schema.setDataSource(event.getDatabaseName());
-        schema.setName(event.getCollectionName());
-        schema.setFields(new ArrayList<>());
-        buildFieleds(schema);
-        DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
-        dataEntryBuilder.timestamp(System.currentTimeMillis())
-                .queue(event.getNamespace().replace(".", "-").replace("$", ""))
-                .entryType(event.getEntryType());
-
-        if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) {
-            dataEntryBuilder.putFiled(Constants.CREATED, event.getDocument().toJson());
-            dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace());
-        } else {
-            dataEntryBuilder.putFiled(Constants.OPERATIONTYPE, event.getOperationType().name());
-            dataEntryBuilder.putFiled(Constants.TIMESTAMP, event.getTimestamp().getValue());
-            dataEntryBuilder.putFiled(Constants.VERSION, event.getV());
-            dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace());
-            dataEntryBuilder.putFiled(Constants.PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : "");
-            dataEntryBuilder.putFiled(Constants.OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
-        }
-        SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-                ByteBuffer.wrap(mongoSource.getBytes(StandardCharsets.UTF_8)),
-                ByteBuffer.wrap(position.toJSONString().getBytes(StandardCharsets.UTF_8)));
-        res.add(sourceDataEntry);
-        return res;
+        return replicaSetsContext.poll();
     }
 
     @Override
     public void start(KeyValue config) {
         try {
-            replicatorConfig = new MongoReplicatorConfig();
-            replicatorConfig.load(config);
-            mongoReplicator = new MongoReplicator(replicatorConfig);
-            mongoSource = replicatorConfig.getDataSouce();
-            ByteBuffer position = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
-                    mongoSource.getBytes()));
-
-            if (position != null && position.array().length > 0) {
-                String positionJson = new String(position.array(), StandardCharsets.UTF_8);
-                JSONObject jsonObject = JSONObject.parseObject(positionJson);
-                replicatorConfig.setPositionTimeStamp(jsonObject.getIntValue("timeStamp"));
-                replicatorConfig.setPositionInc(jsonObject.getIntValue("inc"));
-                replicatorConfig.setDataSync(jsonObject.getBooleanValue(Constants.INITSYNC));
-            } else {
-                replicatorConfig.setDataSync(true);
-            }
-            mongoReplicator.start();
-        }catch (Throwable throwable) {
-            logger.info("task start error", throwable);
+            sourceTaskConfig = new SourceTaskConfig();
+            sourceTaskConfig.load(config);
+            replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
+            replicaSets = ReplicaSets.create(sourceTaskConfig.getMongoAddr());
+            replicaSets.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> {
+                ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
+                        replicaSetName.getBytes()));
+                if (byteBuffer != null && byteBuffer.array().length > 0) {
+                    String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8);
+                    ReplicaSetConfig.Position position = JSONObject.parseObject(positionJson, ReplicaSetConfig.Position.class);
+                    replicaSetConfig.setPosition(position);
+                } else {
+                    ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
+                    position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp() != null
+                            && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches()
+                            ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0);
+                    position.setInc(sourceTaskConfig.getPositionInc() != null
+                            && pattern.matcher(sourceTaskConfig.getPositionInc()).matches()
+                            ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0);
+                    position.setInitSync(StringUtils.equals(sourceTaskConfig.getDataSync(), Constants.INITSYNC) ? true : false);
+                    replicaSetConfig.setPosition(position);
+                }
+
+                ReplicaSet replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
+                replicaSetsContext.addReplicaSet(replicaSet);
+                replicaSet.start();
+            });
+
+
+        } catch (Throwable throwable) {
+            logger.error("task start error", throwable);
             stop();
         }
     }
@@ -94,51 +74,19 @@ public class MongoSourceTask extends SourceTask {
     @Override
     public void stop() {
         logger.info("shut down.....");
-        mongoReplicator.shutdown();
+        replicaSetsContext.shutdown();
     }
 
     @Override
     public void pause() {
-        mongoReplicator.pause();
+        logger.info("pause replica task...");
+        replicaSetsContext.pause();
     }
 
     @Override
     public void resume() {
-        mongoReplicator.resume();
-    }
-
-    private void buildFieleds(Schema schema) {
-        Field op = new Field(0, Constants.OPERATIONTYPE, FieldType.STRING);
-        schema.getFields().add(op);
-        Field time = new Field(1, Constants.TIMESTAMP, FieldType.INT64);
-        schema.getFields().add(time);
-        Field v = new Field(2, Constants.VERSION, FieldType.INT32);
-        schema.getFields().add(v);
-        Field namespace = new Field(3, Constants.NAMESPACE, FieldType.STRING);
-        schema.getFields().add(namespace);
-        Field operation = new Field(4, Constants.CREATED, FieldType.STRING);
-        schema.getFields().add(operation);
-        Field patch = new Field(5, Constants.PATCH, FieldType.STRING);
-        schema.getFields().add(patch);
-        Field objectId = new Field(6, Constants.OBJECTID, FieldType.STRING);
-        schema.getFields().add(objectId);
+        logger.info("resume replica task...");
+        replicaSetsContext.resume();
     }
 
-    private JSONObject position(ReplicationEvent event) {
-        JSONObject jsonObject = new JSONObject();
-        switch (event.getOperationType()) {
-            case CREATED:
-                jsonObject.put(Constants.POSITION_TIMESTAMP, 0);
-                jsonObject.put(Constants.POSITION_INC, 0);
-                jsonObject.put(Constants.INITSYNC, true);
-                break;
-            default:
-                jsonObject.put(Constants.POSITION_TIMESTAMP, event.getTimestamp().getTime());
-                jsonObject.put(Constants.POSITION_INC, event.getTimestamp().getInc());
-                jsonObject.put(Constants.INITSYNC, false);
-                break;
-        }
-        return jsonObject;
-
-    }
 }
diff --git a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
new file mode 100644
index 0000000..7c0db7d
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
@@ -0,0 +1,113 @@
+package org.apache.connect.mongo.connector.builder;
+
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.connector.api.data.*;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.bson.BsonTimestamp;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+import static org.apache.connect.mongo.replicator.Constants.*;
+
+public class MongoDataEntry {
+
+    private static String SCHEMA_CREATED_NAME = "mongo_created";
+    private static String SCHEMA_OPLOG_NAME = "mongo_oplog";
+
+    public static SourceDataEntry createSouceDataEntry(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
+
+        DataEntryBuilder dataEntryBuilder;
+
+        if (event.getOperationType().equals(OperationType.CREATED)) {
+            Schema schema = createdSchema(replicaSetConfig.getReplicaSetName());
+            dataEntryBuilder = new DataEntryBuilder(schema);
+            dataEntryBuilder.timestamp(System.currentTimeMillis())
+                    .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
+                    .entryType(event.getEntryType());
+
+            dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson());
+            dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
+
+        } else {
+            Schema schema = oplogSchema(replicaSetConfig.getReplicaSetName());
+            dataEntryBuilder = new DataEntryBuilder(schema);
+            dataEntryBuilder.timestamp(System.currentTimeMillis())
+                    .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
+                    .entryType(event.getEntryType());
+            dataEntryBuilder.putFiled(OPERATIONTYPE, event.getOperationType().name());
+            dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue());
+            dataEntryBuilder.putFiled(VERSION, event.getV());
+            dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
+            dataEntryBuilder.putFiled(PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : "");
+            dataEntryBuilder.putFiled(OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
+        }
+
+
+        String position = createPosition(event, replicaSetConfig);
+        SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+                ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)),
+                ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8)));
+        return sourceDataEntry;
+    }
+
+
+    private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
+        ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
+        BsonTimestamp timestamp = event.getTimestamp();
+        position.setInc(timestamp != null ? timestamp.getInc() : 0);
+        position.setTimeStamp(timestamp != null ? timestamp.getTime() : 0);
+        position.setInitSync(event.getOperationType().equals(OperationType.CREATED) ? true : false);
+        return JSONObject.toJSONString(position);
+
+    }
+
+    private static Schema createdSchema(String dataSourceName) {
+        Schema schema = new Schema();
+        schema.setDataSource(dataSourceName);
+        schema.setName(SCHEMA_CREATED_NAME);
+        schema.setFields(new ArrayList<>());
+        createdField(schema);
+        return schema;
+    }
+
+
+    private static Schema oplogSchema(String dataSourceName) {
+        Schema schema = new Schema();
+        schema.setDataSource(dataSourceName);
+        schema.setName(SCHEMA_OPLOG_NAME);
+        oplogField(schema);
+        return schema;
+    }
+
+
+    private static void createdField(Schema schema) {
+        Field namespace = new Field(0, NAMESPACE, FieldType.STRING);
+        schema.getFields().add(namespace);
+        Field operation = new Field(1, Constants.CREATED, FieldType.STRING);
+        schema.getFields().add(operation);
+    }
+
+    private static void oplogField(Schema schema) {
+        schema.setFields(new ArrayList<>());
+        Field op = new Field(0, OPERATIONTYPE, FieldType.STRING);
+        schema.getFields().add(op);
+        Field time = new Field(1, TIMESTAMP, FieldType.INT64);
+        schema.getFields().add(time);
+        Field v = new Field(2, VERSION, FieldType.INT32);
+        schema.getFields().add(v);
+        Field namespace = new Field(3, NAMESPACE, FieldType.STRING);
+        schema.getFields().add(namespace);
+        Field patch = new Field(4, PATCH, FieldType.STRING);
+        schema.getFields().add(patch);
+        Field objectId = new Field(5, OBJECTID, FieldType.STRING);
+        schema.getFields().add(objectId);
+    }
+
+
+}
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index e12412b..d92e968 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -3,45 +3,48 @@ package org.apache.connect.mongo.initsync;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoIterable;
-import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.ReplicaSet;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.apache.connect.mongo.replicator.event.EventConverter;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-import org.apache.connect.mongo.replicator.Filter;
-import org.apache.connect.mongo.replicator.MongoReplicator;
-import org.apache.connect.mongo.MongoReplicatorConfig;
 import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class InitSync {
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    private MongoReplicatorConfig mongoReplicatorConfig;
+    private ReplicaSetConfig replicaSetConfig;
     private ExecutorService copyExecutor;
     private MongoClient mongoClient;
-    private Filter filter;
+    private ReplicaSetsContext context;
     private int copyThreadCount;
     private Set<CollectionMeta> interestCollections;
     private CountDownLatch countDownLatch;
-    private MongoReplicator mongoReplicator;
+    private ReplicaSet replicaSet;
 
-    public InitSync(MongoReplicatorConfig mongoReplicatorConfig, MongoClient mongoClient, Filter filter, MongoReplicator mongoReplicator) {
-        this.mongoReplicatorConfig = mongoReplicatorConfig;
+    public InitSync(ReplicaSetConfig replicaSetConfig, MongoClient mongoClient, ReplicaSetsContext context, ReplicaSet replicaSet) {
+        this.replicaSetConfig = replicaSetConfig;
         this.mongoClient = mongoClient;
-        this.filter = filter;
-        this.mongoReplicator = mongoReplicator;
+        this.context = context;
+        this.replicaSet = replicaSet;
         init();
     }
 
     public void start() {
         for (CollectionMeta collectionMeta : interestCollections) {
-            copyExecutor.submit(new CopyRunner(mongoClient, countDownLatch, collectionMeta, mongoReplicator));
+            copyExecutor.submit(new CopyRunner(mongoClient, countDownLatch, collectionMeta, replicaSet));
         }
         try {
             countDownLatch.await();
@@ -53,7 +56,7 @@ public class InitSync {
 
     private void init() {
         interestCollections = getInterestCollection();
-        copyThreadCount = Math.min(interestCollections.size(), mongoReplicatorConfig.getCopyThread());
+        copyThreadCount = Math.min(interestCollections.size(), context.getCopyThread());
         copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() {
 
             AtomicInteger threads = new AtomicInteger();
@@ -76,7 +79,7 @@ public class InitSync {
             while (collIterator.hasNext()) {
                 String collectionName = collIterator.next();
                 CollectionMeta collectionMeta = new CollectionMeta(dataBaseName, collectionName);
-                if (filter.filter(collectionMeta)) {
+                if (context.filterMeta(collectionMeta)) {
                     res.add(collectionMeta);
                 }
             }
@@ -92,40 +95,46 @@ public class InitSync {
         private MongoClient mongoClient;
         private CountDownLatch countDownLatch;
         private CollectionMeta collectionMeta;
-        private MongoReplicator mongoReplicator;
+        private ReplicaSet replicaSet;
 
-        public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, MongoReplicator mongoReplicator) {
+        public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, ReplicaSet replicaSet) {
             this.mongoClient = mongoClient;
             this.countDownLatch = countDownLatch;
             this.collectionMeta = collectionMeta;
-            this.mongoReplicator = mongoReplicator;
+            this.replicaSet = replicaSet;
         }
 
         @Override
         public void run() {
-
+            logger.info("start copy database:{}, collection:{}", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName());
+            int count = 0;
             try {
-
                 MongoCursor<Document> mongoCursor = mongoClient.getDatabase(collectionMeta.getDatabaseName())
                         .getCollection(collectionMeta.getCollectionName())
                         .find()
                         .batchSize(200)
                         .iterator();
-
-                while (mongoReplicator.isRuning() && mongoCursor.hasNext()) {
+                while (replicaSet.isRuning() && mongoCursor.hasNext()) {
+                    if (context.initSyncAbort()) {
+                        logger.info("init sync database:{}, collection:{} abort, has copy:{} document", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count);
+                        return;
+                    }
+                    count++;
                     Document document = mongoCursor.next();
-                    ReplicationEvent event = DocumentConvertEvent.convert(document);
+                    ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
                     event.setOperationType(OperationType.CREATED);
                     event.setNamespace(collectionMeta.getNameSpace());
-                    mongoReplicator.publishEvent(event);
+                    context.publishEvent(event, replicaSetConfig);
                 }
 
             } catch (Exception e) {
-                logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace());
+                context.initSyncError();
+                logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace(), e);
             } finally {
                 countDownLatch.countDown();
+                replicaSet.shutdown();
             }
-            logger.info("database:{}, collection:{}, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName());
+            logger.info("database:{}, collection:{}, copy {} documents, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count);
         }
     }
 
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
index 668fd91..1a91a57 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Constants.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
@@ -3,8 +3,6 @@ package org.apache.connect.mongo.replicator;
 public class Constants {
 
 
-    public static final String APPLICATION_NAME = "java-mongo-replicator";
-
 
     public static final String MONGO_LOCAL_DATABASE = "local";
     public static final String MONGO_OPLOG_RS = "oplog.rs";
@@ -23,11 +21,7 @@ public class Constants {
     public static final String CREATED = "created";
     public static final String PATCH = "patch";
 
-    public static final String INITIAL = "initial";
-
 
-    public static final String POSITION_TIMESTAMP = "timeStamp";
-    public static final String POSITION_INC = "inc";
     public static final String INITSYNC = "initSync";
 
 }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
index 4a62e41..3e431a3 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
@@ -4,7 +4,7 @@ package org.apache.connect.mongo.replicator;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.SourceTaskConfig;
 import org.apache.connect.mongo.initsync.CollectionMeta;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
@@ -21,9 +21,9 @@ public class Filter {
     private Function<OperationType, Boolean> notNoopFilter;
 
 
-    public Filter(MongoReplicatorConfig mongoReplicatorConfig) {
+    public Filter(SourceTaskConfig sourceTaskConfig) {
 
-        String interestDbAndCollection = mongoReplicatorConfig.getInterestDbAndCollection();
+        String interestDbAndCollection = sourceTaskConfig.getInterestDbAndCollection();
 
         if (StringUtils.isNotBlank(interestDbAndCollection)) {
             JSONObject jsonObject = JSONObject.parseObject(interestDbAndCollection);
@@ -57,7 +57,7 @@ public class Filter {
     }
 
 
-    public boolean filter(CollectionMeta collectionMeta) {
+    public boolean filterMeta(CollectionMeta collectionMeta) {
         return dbAndCollectionFilter.apply(collectionMeta);
     }
 
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
deleted file mode 100644
index 60b8d3d..0000000
--- a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package org.apache.connect.mongo.replicator;
-
-import com.mongodb.ConnectionString;
-import com.mongodb.MongoClientSettings;
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.MongoIterable;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.connect.mongo.MongoReplicatorConfig;
-import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.connect.mongo.replicator.Constants.*;
-
-
-public class MongoReplicator {
-
-    private Logger logger = LoggerFactory.getLogger(this.getClass());
-
-    private AtomicBoolean running = new AtomicBoolean();
-
-    private MongoReplicatorConfig mongoReplicatorConfig;
-
-    private MongoClientSettings clientSettings;
-
-    private ConnectionString connectionString;
-
-    private MongoClient mongoClient;
-
-    private BlockingQueue<ReplicationEvent> queue = new LinkedBlockingQueue<>();
-
-    private Filter filter;
-
-    private ExecutorService executorService;
-
-    private volatile boolean pause = false;
-
-    public MongoReplicator(MongoReplicatorConfig mongoReplicatorConfig) {
-        this.mongoReplicatorConfig = mongoReplicatorConfig;
-        this.filter = new Filter(mongoReplicatorConfig);
-        this.executorService = Executors.newSingleThreadExecutor((r) ->new Thread(r, "real_time_replica_thread"));
-
-        buildConnectionString();
-    }
-
-    public void start() {
-
-        try {
-            if (!running.compareAndSet(false, true)) {
-                logger.info("the java mongo replica already start");
-                return;
-            }
-
-            this.clientSettings = MongoClientSettings.builder()
-                    .applicationName(APPLICATION_NAME)
-                    .applyConnectionString(connectionString)
-                    .build();
-            this.mongoClient = MongoClients.create(clientSettings);
-            this.isReplicaMongo();
-            executorService.submit(new ReplicatorTask(this, mongoClient, mongoReplicatorConfig, filter));
-        }catch (Exception e) {
-            logger.info("start replicator error", e);
-            shutdown();
-        }
-    }
-
-
-    private void buildConnectionString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("mongodb://");
-        if (StringUtils.isNotBlank(mongoReplicatorConfig.getMongoUserName())
-                && StringUtils.isNotBlank(mongoReplicatorConfig.getMongoPassWord())) {
-            sb.append(mongoReplicatorConfig.getMongoUserName());
-            sb.append(":");
-            sb.append(mongoReplicatorConfig.getMongoPassWord());
-            sb.append("@");
-
-        }
-        sb.append(mongoReplicatorConfig.getMongoAddr());
-        sb.append("/");
-        if (StringUtils.isNotBlank(mongoReplicatorConfig.getReplicaSet())) {
-            sb.append("?");
-            sb.append("replicaSet=");
-            sb.append(mongoReplicatorConfig.getReplicaSet());
-        }
-        this.connectionString = new ConnectionString(sb.toString());
-    }
-
-
-    public boolean isReplicaMongo() {
-        MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
-        MongoIterable<String> collectionNames = local.listCollectionNames();
-        for (String collectionName : collectionNames) {
-            if (MONGO_OPLOG_RS.equals(collectionName)) {
-                return true;
-            }
-        }
-        this.shutdown();
-        throw new IllegalStateException(String.format("url:%s, set:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getReplicaSet()));
-    }
-
-    public void shutdown() {
-        if (running.compareAndSet(true, false)) {
-            if (!this.executorService.isShutdown()) {
-                executorService.shutdown();
-            }
-            if (this.mongoClient != null) {
-                this.mongoClient.close();
-            }
-        }
-
-    }
-
-    public void publishEvent(ReplicationEvent replicationEvent) {
-        while (true) {
-            try {
-                queue.put(replicationEvent);
-                break;
-            } catch (Exception e) {
-            }
-        }
-    }
-
-
-
-    public void pause() {
-        pause = true;
-    }
-
-    public void resume() {
-        pause = false;
-    }
-
-    public boolean isPause() {
-        return pause;
-    }
-
-    public boolean isRuning() {
-        return running.get();
-    }
-
-    public BlockingQueue<ReplicationEvent> getQueue() {
-        return queue;
-    }
-}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
new file mode 100644
index 0000000..b141c2b
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
@@ -0,0 +1,101 @@
+package org.apache.connect.mongo.replicator;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.connect.mongo.replicator.Constants.MONGO_LOCAL_DATABASE;
+import static org.apache.connect.mongo.replicator.Constants.MONGO_OPLOG_RS;
+
+
+public class ReplicaSet {
+
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private AtomicBoolean running = new AtomicBoolean();
+
+    private ReplicaSetConfig replicaSetConfig;
+
+
+    private ReplicaSetsContext replicaSetsContext;
+
+    private MongoClient mongoClient;
+
+    private ExecutorService executorService;
+
+    private volatile boolean pause = false;
+
+    public ReplicaSet(ReplicaSetConfig replicaSetConfig, ReplicaSetsContext replicaSetsContext) {
+        this.replicaSetConfig = replicaSetConfig;
+        this.replicaSetsContext = replicaSetsContext;
+        this.executorService = Executors.newSingleThreadExecutor((r) -> new Thread(r, "real_time_replica_" + replicaSetConfig.getReplicaSetName() + "thread"));
+
+    }
+
+    public void start() {
+
+        try {
+            if (!running.compareAndSet(false, true)) {
+                logger.info("the java mongo replica already start");
+                return;
+            }
+            this.mongoClient = replicaSetsContext.createMongoClient(replicaSetConfig);
+            this.isReplicaMongo();
+            executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext));
+        } catch (Exception e) {
+            logger.error("start replicator:{} error", replicaSetConfig, e);
+            shutdown();
+        }
+    }
+
+
+    public boolean isReplicaMongo() {
+        MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
+        MongoIterable<String> collectionNames = local.listCollectionNames();
+        MongoCursor<String> iterator = collectionNames.iterator();
+        while (iterator.hasNext()) {
+            if (StringUtils.equals(MONGO_OPLOG_RS, iterator.next())) {
+                return true;
+            }
+        }
+        this.shutdown();
+        throw new IllegalStateException(String.format("url:%s,  is not replica", replicaSetConfig.getHost()));
+    }
+
+    public void shutdown() {
+        if (running.compareAndSet(true, false)) {
+            if (!this.executorService.isShutdown()) {
+                executorService.shutdown();
+            }
+            if (this.mongoClient != null) {
+                this.mongoClient.close();
+            }
+        }
+
+    }
+
+
+    public void pause() {
+        pause = true;
+    }
+
+    public void resume() {
+        pause = false;
+    }
+
+    public boolean isPause() {
+        return pause;
+    }
+
+    public boolean isRuning() {
+        return running.get();
+    }
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
new file mode 100644
index 0000000..4b8d148
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
@@ -0,0 +1,139 @@
+package org.apache.connect.mongo.replicator;
+
+import org.bson.BsonTimestamp;
+
+import java.util.Objects;
+
+public class ReplicaSetConfig {
+
+
+    private String shardName;
+    private String replicaSetName;
+    private String host;
+    private Position position;
+
+
+    public Position getPosition() {
+        return position;
+    }
+
+    public void setPosition(Position position) {
+        this.position = position;
+    }
+
+    public String getShardName() {
+        return shardName;
+    }
+
+    public void setShardName(String shardName) {
+        this.shardName = shardName;
+    }
+
+    public String getReplicaSetName() {
+        return replicaSetName;
+    }
+
+    public void setReplicaSetName(String replicaSetName) {
+        this.replicaSetName = replicaSetName;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public ReplicaSetConfig(String shardName, String replicaSetName, String host) {
+        this.shardName = shardName;
+        this.replicaSetName = replicaSetName;
+        this.host = host;
+    }
+
+    public Position emptyPosition() {
+        return new Position(0, 0, true);
+    }
+
+
+    public class Position {
+        private int timeStamp;
+        private int inc;
+        private boolean initSync;
+
+
+        public int getTimeStamp() {
+            return timeStamp;
+        }
+
+        public void setTimeStamp(int timeStamp) {
+            this.timeStamp = timeStamp;
+        }
+
+        public int getInc() {
+            return inc;
+        }
+
+        public void setInc(int inc) {
+            this.inc = inc;
+        }
+
+        public boolean isInitSync() {
+            return initSync;
+        }
+
+        public void setInitSync(boolean initSync) {
+            this.initSync = initSync;
+        }
+
+
+        public Position(int timeStamp, int inc, boolean initSync) {
+            this.timeStamp = timeStamp;
+            this.inc = inc;
+            this.initSync = initSync;
+        }
+
+        public boolean isValid() {
+            return timeStamp > 0;
+        }
+
+        public BsonTimestamp converBsonTimeStamp() {
+            return new BsonTimestamp(timeStamp, inc);
+        }
+
+        @Override
+        public String toString() {
+            return "Position{" +
+                    "timeStamp=" + timeStamp +
+                    ", inc=" + inc +
+                    ", initSync=" + initSync +
+                    '}';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Position position = (Position) o;
+            return timeStamp == position.timeStamp &&
+                    inc == position.inc &&
+                    initSync == position.initSync;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(timeStamp, inc, initSync);
+        }
+    }
+
+
+    @Override
+    public String toString() {
+        return "ReplicaSetConfig{" +
+                "shardName='" + shardName + '\'' +
+                ", replicaSetName='" + replicaSetName + '\'' +
+                ", host='" + host + '\'' +
+                ", position=" + position +
+                '}';
+    }
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
new file mode 100644
index 0000000..9184b90
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
@@ -0,0 +1,70 @@
+package org.apache.connect.mongo.replicator;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ReplicaSets {
+
+    private static final Pattern HOST_PATTERN = Pattern.compile("((([^=]+)[=])?(([^/]+)\\/))?(.+)");
+
+
+    private final Map<String, ReplicaSetConfig> replicaConfigByName = new HashMap<>();
+
+
+    public ReplicaSets(Set<ReplicaSetConfig> replicaSetConfigs) {
+        replicaSetConfigs.forEach(replicaSetConfig -> {
+            if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
+                replicaConfigByName.put(replicaSetConfig.getReplicaSetName(), replicaSetConfig);
+            }
+        });
+
+        validate();
+    }
+
+    public static ReplicaSets create(String hosts) {
+        Set<ReplicaSetConfig> replicaSetConfigs = new HashSet<>();
+        if (hosts != null) {
+            for (String replicaSetStr : StringUtils.split(hosts.trim(), ";")) {
+                if (StringUtils.isNotBlank(replicaSetStr)) {
+                    ReplicaSetConfig replicaSetConfig = parseReplicaSetStr(replicaSetStr);
+                    if (replicaSetConfig != null) {
+                        replicaSetConfigs.add(replicaSetConfig);
+                    }
+                }
+            }
+        }
+        return new ReplicaSets(replicaSetConfigs);
+    }
+
+
+    private static ReplicaSetConfig parseReplicaSetStr(String hosts) {
+        if (hosts != null) {
+            Matcher matcher = HOST_PATTERN.matcher(hosts);
+            if (matcher.matches()) {
+                String shard = matcher.group(3);
+                String replicaSetName = matcher.group(5);
+                String host = matcher.group(6);
+                if (host != null && host.trim().length() != 0) {
+                    return new ReplicaSetConfig(shard, replicaSetName, host);
+                }
+            }
+        }
+        return null;
+    }
+
+    private void validate() {
+        Validate.isTrue(replicaConfigByName.size() > 0, "task config mongoAdd need special replicaSet addr");
+
+    }
+
+    public Map<String, ReplicaSetConfig> getReplicaConfigByName() {
+        return replicaConfigByName;
+    }
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
new file mode 100644
index 0000000..f66ca14
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
@@ -0,0 +1,129 @@
+package org.apache.connect.mongo.replicator;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.SourceTaskConfig;
+import org.apache.connect.mongo.connector.builder.MongoDataEntry;
+import org.apache.connect.mongo.initsync.CollectionMeta;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ReplicaSetsContext {
+
+    private BlockingQueue<SourceDataEntry> dataEntryQueue;
+
+    private SourceTaskConfig taskConfig;
+
+    private List<ReplicaSet> replicaSets;
+
+    private AtomicBoolean initSyncAbort = new AtomicBoolean();
+
+    private Filter filter;
+
+    public ReplicaSetsContext(SourceTaskConfig taskConfig) {
+        this.taskConfig = taskConfig;
+        this.replicaSets = new CopyOnWriteArrayList<>();
+        this.dataEntryQueue = new LinkedBlockingDeque<>();
+        this.filter = new Filter(taskConfig);
+    }
+
+
+    public MongoClient createMongoClient(ReplicaSetConfig replicaSetConfig) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("mongodb://");
+        if (StringUtils.isNotBlank(taskConfig.getMongoUserName())
+                && StringUtils.isNotBlank(taskConfig.getMongoPassWord())) {
+            sb.append(taskConfig.getMongoUserName());
+            sb.append(":");
+            sb.append(taskConfig.getMongoPassWord());
+            sb.append("@");
+
+        }
+        sb.append(replicaSetConfig.getHost());
+        sb.append("/");
+        if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
+            sb.append("?");
+            sb.append("replicaSet=");
+            sb.append(replicaSetConfig.getReplicaSetName());
+        }
+        ConnectionString connectionString = new ConnectionString(sb.toString());
+        return MongoClients.create(connectionString);
+    }
+
+
+    public boolean filterEvent(ReplicationEvent event) {
+        return filter.filterEvent(event);
+    }
+
+
+    public boolean filterMeta(CollectionMeta collectionMeta) {
+        return filter.filterMeta(collectionMeta);
+    }
+
+
+    public int getCopyThread() {
+        return taskConfig.getCopyThread() > 0 ? taskConfig.getCopyThread() : Runtime.getRuntime().availableProcessors();
+    }
+
+
+    public void addReplicaSet(ReplicaSet replicaSet) {
+        this.replicaSets.add(replicaSet);
+    }
+
+
+    public void shutdown() {
+        replicaSets.forEach(ReplicaSet::shutdown);
+    }
+
+    public void pause() {
+        replicaSets.forEach(ReplicaSet::pause);
+    }
+
+
+    public void resume() {
+        replicaSets.forEach(ReplicaSet::resume);
+    }
+
+
+    public void publishEvent(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
+        SourceDataEntry sourceDataEntry = MongoDataEntry.createSouceDataEntry(event, replicaSetConfig);
+        while (true) {
+            try {
+                dataEntryQueue.put(sourceDataEntry);
+                break;
+            } catch (InterruptedException e) {
+            }
+        }
+
+    }
+
+    public Collection<SourceDataEntry> poll() {
+        List<SourceDataEntry> res = new ArrayList<>();
+        if (dataEntryQueue.drainTo(res, 20) == 0) {
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+            }
+        }
+        return res;
+    }
+
+    public boolean initSyncAbort() {
+        return initSyncAbort.get();
+    }
+
+    public void initSyncError() {
+        initSyncAbort.set(true);
+    }
+
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index 766225f..bf4ebac 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -6,9 +6,8 @@ import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.Filters;
-import org.apache.connect.mongo.MongoReplicatorConfig;
 import org.apache.connect.mongo.initsync.InitSync;
-import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.event.EventConverter;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 import org.bson.Document;
 import org.slf4j.Logger;
@@ -19,34 +18,34 @@ public class ReplicatorTask implements Runnable {
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    private MongoReplicator mongoReplicator;
+    private ReplicaSet replicaSet;
 
     private MongoClient mongoClient;
 
-    private MongoReplicatorConfig mongoReplicatorConfig;
+    private ReplicaSetConfig replicaSetConfig;
 
-    private Filter filter;
+    private ReplicaSetsContext replicaSetsContext;
 
-    public ReplicatorTask(MongoReplicator mongoReplicator, MongoClient mongoClient, MongoReplicatorConfig mongoReplicatorConfig, Filter filter) {
-        this.mongoReplicator = mongoReplicator;
-        this.mongoReplicatorConfig = mongoReplicatorConfig;
+    public ReplicatorTask(ReplicaSet replicaSet, MongoClient mongoClient, ReplicaSetConfig replicaSetConfig, ReplicaSetsContext replicaSetsContext) {
+        this.replicaSet = replicaSet;
+        this.replicaSetConfig = replicaSetConfig;
         this.mongoClient = mongoClient;
-        this.filter = filter;
+        this.replicaSetsContext = replicaSetsContext;
     }
 
     @Override
     public void run() {
 
-        if (mongoReplicatorConfig.getDataSync()) {
-            InitSync initSync = new InitSync(mongoReplicatorConfig, mongoClient, filter, mongoReplicator);
+        if (replicaSetConfig.getPosition() == null || replicaSetConfig.getPosition().isInitSync()) {
+            InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet);
             initSync.start();
         }
 
         MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
         FindIterable<Document> iterable;
-        if (mongoReplicatorConfig.getPositionTimeStamp() > 0 && mongoReplicatorConfig.getPositionTimeStamp() < System.currentTimeMillis()) {
+        if (replicaSetConfig.getPosition().isValid()) {
             iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
-                    Filters.gt("ts", mongoReplicatorConfig.getPosition()));
+                    Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
         } else {
             iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
         }
@@ -56,26 +55,28 @@ public class ReplicatorTask implements Runnable {
                 .batchSize(200)
                 .iterator();
 
-        while (mongoReplicator.isRuning()) {
+        while (replicaSet.isRuning()) {
             try {
                 executorCursor(cursor);
-                Thread.sleep(100);
             } catch (Exception e) {
-                e.printStackTrace();
+                logger.error("replicaSet:{} shutdown.....", replicaSetConfig, e);
             } finally {
-                logger.error("mongoReplicator shutdown.....");
-                mongoReplicator.shutdown();
+                if (cursor != null) {
+                    cursor.close();
+                }
+                replicaSet.shutdown();
             }
         }
+        logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig);
     }
 
 
     private void executorCursor(MongoCursor<Document> cursor) {
-        while (cursor.hasNext() && !mongoReplicator.isPause()) {
+        while (cursor.hasNext() && !replicaSet.isPause()) {
             Document document = cursor.next();
-            ReplicationEvent event = DocumentConvertEvent.convert(document);
-            if (filter.filterEvent(event)) {
-                mongoReplicator.publishEvent(event);
+            ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
+            if (replicaSetsContext.filterEvent(event)) {
+                replicaSetsContext.publishEvent(event, replicaSetConfig);
             }
         }
     }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
deleted file mode 100644
index a18aa52..0000000
--- a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.connect.mongo.replicator.event;
-
-import org.bson.BsonTimestamp;
-import org.bson.Document;
-
-import java.util.Optional;
-
-import static org.apache.connect.mongo.replicator.Constants.*;
-
-
-public class DocumentConvertEvent {
-
-
-    public static ReplicationEvent convert(Document document) {
-
-        OperationType operationType = OperationType.getOperationType(document.getString(OPERATIONTYPE));
-        BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP);
-//                Long t = document.getLong("t");
-        Long h = document.getLong(HASH);
-        Integer v = document.getInteger(VERSION);
-        String nameSpace = document.getString(NAMESPACE);
-//                String uuid = document.getString("uuid");
-//                Date wall = document.getDate("wall");
-        Document operation = document.get(OPERATION, Document.class);
-        Document objectID = document.get(OBJECTID, Document.class);
-        return new ReplicationEvent(operationType, timestamp, v, h, nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), document);
-    }
-
-}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
new file mode 100644
index 0000000..57b4ac8
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
@@ -0,0 +1,29 @@
+package org.apache.connect.mongo.replicator.event;
+
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+
+import java.util.Optional;
+
+import static org.apache.connect.mongo.replicator.Constants.*;
+
+
+public class EventConverter {
+
+
+    public static ReplicationEvent convert(Document document, String replicaSetName) {
+
+        ReplicationEvent event = new ReplicationEvent();
+        event.setOperationType(OperationType.getOperationType(document.getString(OPERATIONTYPE)));
+        event.setTimestamp(document.get(TIMESTAMP, BsonTimestamp.class));
+        event.setH(document.getLong(HASH));
+        event.setV(document.getInteger(VERSION));
+        event.setNamespace(document.getString(NAMESPACE));
+        event.setEventData(Optional.ofNullable(document.get(OPERATION, Document.class)));
+        event.setObjectId(Optional.ofNullable(document.get(OBJECTID, Document.class)));
+        event.setReplicaSetName(replicaSetName);
+        event.setDocument(document);
+        return event;
+    }
+
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
index 7719b3e..283e9d6 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
@@ -19,6 +19,7 @@ public class ReplicationEvent {
     private String namespace;
     private Optional<Document> eventData;
     private Optional<Document> objectId;
+    private String replicaSetName;
 
 
     public ReplicationEvent() {
@@ -136,18 +137,29 @@ public class ReplicationEvent {
         this.objectId = objectId;
     }
 
+
+    public void setReplicaSetName(String replicaSetName) {
+        this.replicaSetName = replicaSetName;
+    }
+
+    public String getReplicaSetName() {
+        return replicaSetName;
+    }
+
     @Override
     public String toString() {
         return "ReplicationEvent{" +
-                "operationType=" + operationType +
+                "document=" + document +
+                ", operationType=" + operationType +
                 ", v=" + v +
                 ", h=" + h +
                 ", timestamp=" + timestamp +
                 ", databaseName='" + databaseName + '\'' +
                 ", collectionName='" + collectionName + '\'' +
                 ", namespace='" + namespace + '\'' +
-                ", eventData=" + eventData.toString() +
-                ", objectId=" + objectId.toString() +
+                ", eventData=" + eventData +
+                ", objectId=" + objectId +
+                ", replicaSetName='" + replicaSetName + '\'' +
                 '}';
     }
 }
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
index 2b14b13..60e2514 100644
--- a/src/test/java/org/apache/connect/mongo/FilterTest.java
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -17,12 +17,12 @@ import java.util.Map;
 public class FilterTest {
 
 
-    private MongoReplicatorConfig config;
+    private SourceTaskConfig sourceTaskConfig;
     private Map<String, List<String>> insterest;
 
     @Before
     public void init() {
-        config = new MongoReplicatorConfig();
+        sourceTaskConfig = new SourceTaskConfig();
         insterest = new HashMap<>();
     }
 
@@ -31,18 +31,18 @@ public class FilterTest {
         List<String> collections = new ArrayList<>();
         collections.add("person");
         insterest.put("test", collections);
-        config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
-        Filter filter = new Filter(config);
-        Assert.assertTrue(filter.filter(new CollectionMeta("test", "person")));
-        Assert.assertFalse(filter.filter(new CollectionMeta("test", "person01")));
+        sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
+        Filter filter = new Filter(sourceTaskConfig);
+        Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "person")));
+        Assert.assertFalse(filter.filterMeta(new CollectionMeta("test", "person01")));
     }
 
 
     @Test
     public void testBlankDb() {
-        Filter filter = new Filter(config);
-        Assert.assertTrue(filter.filter(new CollectionMeta("test" ,"test")));
-        Assert.assertTrue(filter.filter(new CollectionMeta("test1" ,"test01")));
+        Filter filter = new Filter(sourceTaskConfig);
+        Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "test")));
+        Assert.assertTrue(filter.filterMeta(new CollectionMeta("test1", "test01")));
     }
 
 
@@ -51,17 +51,17 @@ public class FilterTest {
         List<String> collections = new ArrayList<>();
         collections.add("*");
         insterest.put("test", collections);
-        config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
-        Filter filter = new Filter(config);
-        Assert.assertTrue(filter.filter(new CollectionMeta("test", "testsad")));
-        Assert.assertTrue(filter.filter(new CollectionMeta("test", "tests032")));
+        sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
+        Filter filter = new Filter(sourceTaskConfig);
+        Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "testsad")));
+        Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "tests032")));
     }
 
 
 
     @Test
     public void testFilterEvent() {
-        Filter filter = new Filter(config);
+        Filter filter = new Filter(sourceTaskConfig);
         ReplicationEvent replicationEvent = new ReplicationEvent();
         replicationEvent.setOperationType(OperationType.NOOP);
         Assert.assertFalse(filter.filterEvent(replicationEvent));
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
index 04e1374..d921e63 100644
--- a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
@@ -1,22 +1,40 @@
 package org.apache.connect.mongo;
 
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
 import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.connect.mongo.connector.MongoSourceConnector;
 import org.apache.connect.mongo.connector.MongoSourceTask;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingQueue;
+
 public class MongoSourceConnectorTest {
 
     private MongoSourceConnector mongoSourceConnector;
     private DefaultKeyValue keyValue;
+    private SourceTaskConfig sourceTaskConfig;
 
     @Before
     public void before() {
         mongoSourceConnector = new MongoSourceConnector();
         keyValue = new DefaultKeyValue();
+        sourceTaskConfig = new SourceTaskConfig();
+
     }
 
     @Test
@@ -27,19 +45,58 @@ public class MongoSourceConnectorTest {
 
     @Test
     public void verifyConfig() {
-        keyValue.put("mongoAddr", "127.0.0.1");
+        keyValue.put("mongoAddr", "shardName=replicaName:127.0.0.1:27017");
         String s = mongoSourceConnector.verifyAndSetConfig(keyValue);
-        Assert.assertTrue(s.contains("Request config key:"));
-        keyValue.put("mongoPort", 27017);
-        s = mongoSourceConnector.verifyAndSetConfig(keyValue);
-        Assert.assertTrue(StringUtils.isBlank(s));
+        Assert.assertTrue(s.contains("Request sourceTaskConfig key:"));
     }
 
 
+    @Test
+    public void testPoll() throws Exception {
+        LinkedBlockingQueue<SourceDataEntry> entries = new LinkedBlockingQueue<>();
+        ReplicaSetsContext context = new ReplicaSetsContext(sourceTaskConfig);
+        Field dataEntryQueue = ReplicaSetsContext.class.getDeclaredField("dataEntryQueue");
+        dataEntryQueue.setAccessible(true);
+        dataEntryQueue.set(context, entries);
+        ReplicationEvent event = new ReplicationEvent();
+        event.setOperationType(OperationType.INSERT);
+        event.setNamespace("test.person");
+        event.setTimestamp(new BsonTimestamp(1565609506, 1));
+        event.setDocument(new Document("testKey", "testValue"));
+        event.setH(324243242L);
+        event.setEventData(Optional.ofNullable(new Document("testEventKey", "testEventValue")));
+        event.setObjectId(Optional.empty());
+        context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27017"));
+        List<SourceDataEntry> sourceDataEntries = (List<SourceDataEntry>) context.poll();
+        Assert.assertTrue(sourceDataEntries.size() == 1);
+
+        SourceDataEntry sourceDataEntry = sourceDataEntries.get(0);
+        Assert.assertEquals("test-person", sourceDataEntry.getQueueName());
+
 
+        ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
+        Assert.assertEquals("testReplicaName", new String(sourcePartition.array()));
 
 
+        ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
+        ReplicaSetConfig.Position position = JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class);
+        Assert.assertEquals(position.getTimeStamp(), 1565609506);
+        Assert.assertEquals(position.getInc(), 1);
+        Assert.assertEquals(position.isInitSync(), false);
 
 
+        EntryType entryType = sourceDataEntry.getEntryType();
+        Assert.assertEquals(EntryType.CREATE, entryType);
+
+        String queueName = sourceDataEntry.getQueueName();
+        Assert.assertEquals("test-person", queueName);
+
+        Schema schema = sourceDataEntry.getSchema();
+        Assert.assertTrue(schema.getFields().size() == 6);
+        Object[] payload = sourceDataEntry.getPayload();
+        Assert.assertTrue(payload.length == 6);
+
+    }
+
 
 }
diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java
index cc83fbe..7b0291a 100644
--- a/src/test/java/org/apache/connect/mongo/MongoTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -7,11 +7,14 @@ import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
 import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
 import org.apache.connect.mongo.initsync.InitSync;
 import org.apache.connect.mongo.replicator.Constants;
-import org.apache.connect.mongo.replicator.Filter;
-import org.apache.connect.mongo.replicator.MongoReplicator;
-import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.ReplicaSet;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.apache.connect.mongo.replicator.event.EventConverter;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 import org.bson.BsonTimestamp;
@@ -21,12 +24,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.nio.ByteBuffer;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class MongoTest {
@@ -36,7 +35,7 @@ public class MongoTest {
     @Before
     public void before() {
         MongoClientSettings.Builder builder = MongoClientSettings.builder();
-        builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27018"));
+        builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077"));
         mongoClient = MongoClients.create(builder.build());
     }
 
@@ -52,13 +51,14 @@ public class MongoTest {
         Document document = new Document();
         document.put("test", "test");
         oplog.put(Constants.OPERATION, document);
-        ReplicationEvent event = DocumentConvertEvent.convert(oplog);
+        ReplicationEvent event = EventConverter.convert(oplog, "testR");
         Assert.assertEquals(timestamp, event.getTimestamp());
         Assert.assertEquals("test.person", event.getNamespace());
         Assert.assertTrue(11111L == event.getH());
         Assert.assertEquals(OperationType.INSERT, event.getOperationType());
         Assert.assertEquals(EntryType.CREATE, event.getEntryType());
         Assert.assertEquals(document, event.getEventData().get());
+        Assert.assertEquals("testR", event.getReplicaSetName());
 
 
     }
@@ -68,38 +68,58 @@ public class MongoTest {
     public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
         MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person");
         collection.deleteMany(new Document());
-        int count = 1;
-        List<Document> documents = new ArrayList<>(count);
+        int count = 1000;
+        List<String> documents = new ArrayList<>(count);
         for (int i = 0; i < count; i++) {
             Document document = new Document();
             document.put("name", "test" + i);
             document.put("age", i);
             document.put("sex", i % 2 == 0 ? "boy" : "girl");
             collection.insertOne(document);
-            documents.add(document);
+            documents.add(document.getObjectId("_id").toHexString());
         }
-        MongoReplicatorConfig config = new MongoReplicatorConfig();
+        SourceTaskConfig sourceTaskConfig = new SourceTaskConfig();
         Map<String, List<String>> insterest = new HashMap<>();
         List<String> collections = new ArrayList<>();
         collections.add("*");
         insterest.put("test", collections);
-        config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
-        MongoReplicator mongoReplicator = new MongoReplicator(config);
-        Field running = MongoReplicator.class.getDeclaredField("running");
+        sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
+        ReplicaSetConfig replicaSetConfig = new ReplicaSetConfig("", "test", "localhost");
+        ReplicaSetsContext replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
+        ReplicaSet replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
+        Field running = ReplicaSet.class.getDeclaredField("running");
         running.setAccessible(true);
-        running.set(mongoReplicator, new AtomicBoolean(true));
-        InitSync initSync = new InitSync(config, mongoClient, new Filter(config), mongoReplicator);
+        running.set(replicaSet, new AtomicBoolean(true));
+        InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet);
         initSync.start();
-        BlockingQueue<ReplicationEvent> queue = mongoReplicator.getQueue();
-        while (count > 0) {
-            count--;
-            ReplicationEvent event = queue.poll(100, TimeUnit.MILLISECONDS);
-            Assert.assertTrue(event.getOperationType().equals(OperationType.CREATED));
-            Assert.assertNotNull(event.getDocument());
-            Assert.assertTrue(documents.contains(event.getDocument()));
-        }
-
+        int syncCount = 0;
+        while (syncCount < count) {
+            Collection<SourceDataEntry> sourceDataEntries = replicaSetsContext.poll();
+            Assert.assertNotNull(sourceDataEntries);
+            for (SourceDataEntry sourceDataEntry : sourceDataEntries) {
+                ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
+                Assert.assertEquals("test", new String(sourcePartition.array()));
+                ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
+                ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
+                position.setInitSync(true);
+                position.setTimeStamp(0);
+                position.setInc(0);
+                Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class));
+                EntryType entryType = sourceDataEntry.getEntryType();
+                Assert.assertEquals(EntryType.CREATE, entryType);
+                String queueName = sourceDataEntry.getQueueName();
+                Assert.assertEquals("test-person", queueName);
+                Schema schema = sourceDataEntry.getSchema();
+                Assert.assertTrue(schema.getFields().size() == 2);
+                Object[] payload = sourceDataEntry.getPayload();
+                Assert.assertTrue(payload.length == 2);
+                Assert.assertEquals(payload[0].toString(), "test.person");
+                Assert.assertTrue(documents.contains(JSONObject.parseObject(payload[1].toString(), Document.class).get("_id", JSONObject.class).getString("$oid")));
+                syncCount++;
+            }
 
+        }
 
+        Assert.assertTrue(syncCount == count);
     }
 }
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
new file mode 100644
index 0000000..8613c42
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
@@ -0,0 +1,34 @@
+package org.apache.connect.mongo;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoIterable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicaContextTest {
+
+    private ReplicaSetsContext context;
+
+    @Before
+    public void before() {
+        SourceTaskConfig sourceTaskConfig = new SourceTaskConfig();
+        context = new ReplicaSetsContext(sourceTaskConfig);
+    }
+
+
+    @Test
+    public void testCreateMongoClient() {
+        MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27017"));
+        MongoIterable<String> collectionNames = mongoClient.getDatabase("local").listCollectionNames();
+        MongoCursor<String> iterator = collectionNames.iterator();
+        while (iterator.hasNext()) {
+            Assert.assertTrue(StringUtils.isNoneBlank(iterator.next()));
+        }
+    }
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
new file mode 100644
index 0000000..e69eac6
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
@@ -0,0 +1,65 @@
+package org.apache.connect.mongo;
+
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class ReplicaSetsTest {
+
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCreatReplicaSetsException01() {
+        ReplicaSets.create("");
+    }
+
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCreatReplicaSetsException02() {
+        ReplicaSets.create("127.0.0.1:27081");
+    }
+
+
+    @Test
+    public void testCreatReplicaSets01() {
+        ReplicaSets replicaSets = ReplicaSets.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
+        Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+        Assert.assertTrue(replicaSetConfigMap.size() == 1);
+        Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
+        Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
+        Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName());
+    }
+
+
+    @Test
+    public void testCreatReplicaSets02() {
+        ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
+        Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+        Assert.assertTrue(replicaSetConfigMap.size() == 1);
+        Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
+        Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
+        Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName());
+        Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName());
+    }
+
+
+    @Test
+    public void testCreatReplicaSets03() {
+        ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283");
+        Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+        Assert.assertTrue(replicaSetConfigMap.size() == 2);
+        Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
+        Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
+        Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName());
+        Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName());
+
+
+        Assert.assertNotNull(replicaSetConfigMap.get("replicaName2"));
+        Assert.assertEquals("127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283", replicaSetConfigMap.get("replicaName2").getHost());
+        Assert.assertEquals("replicaName2", replicaSetConfigMap.get("replicaName2").getReplicaSetName());
+        Assert.assertEquals("shardName2", replicaSetConfigMap.get("replicaName2").getShardName());
+    }
+
+}