You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:33 UTC

[rocketmq-connect] 10/13: fix some bug and add readme

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 2cdb27a2b2e418a1228c0d83adf6acff64e79554
Author: 李平 <17...@qq.com>
AuthorDate: Thu Aug 22 15:10:57 2019 +0800

    fix some bug and add readme
---
 README.md                                          |  22 +++
 .../org/apache/connect/mongo/SourceTaskConfig.java | 188 +++++++++++----------
 .../mongo/connector/MongoSourceConnector.java      |  17 ++
 .../connect/mongo/connector/MongoSourceTask.java   |  49 +++---
 .../mongo/connector/builder/MongoDataEntry.java    |  32 +++-
 .../connect/mongo/initsync/CollectionMeta.java     |  17 ++
 .../apache/connect/mongo/initsync/InitSync.java    |  27 ++-
 .../apache/connect/mongo/replicator/Constants.java |  23 ++-
 .../apache/connect/mongo/replicator/Filter.java    |  19 ++-
 .../mongo/replicator/MongoClientFactory.java       |  38 +++--
 .../apache/connect/mongo/replicator/Position.java  |  85 ++++++++++
 .../connect/mongo/replicator/ReplicaSet.java       |  32 +++-
 .../connect/mongo/replicator/ReplicaSetConfig.java |  93 ++--------
 .../{ReplicaSets.java => ReplicaSetManager.java}   |  29 +++-
 .../mongo/replicator/ReplicaSetsContext.java       |  29 +++-
 .../connect/mongo/replicator/ReplicatorTask.java   |  62 +++++--
 ...Converter.java => Document2EventConverter.java} |  27 ++-
 .../mongo/replicator/event/OperationType.java      |  54 ++++--
 .../mongo/replicator/event/ReplicationEvent.java   |  17 ++
 .../java/org/apache/connect/mongo/FilterTest.java  |   3 +-
 .../org/apache/connect/mongo/MongoFactoryTest.java |  22 +--
 .../connect/mongo/MongoSourceConnectorTest.java    |   3 +-
 .../apache/connect/mongo/MongoSourceTaskTest.java  |   3 +-
 .../java/org/apache/connect/mongo/MongoTest.java   |  25 ++-
 .../apache/connect/mongo/OperationTypeTest.java    |  37 ++++
 ...icaSetsTest.java => ReplicaSetManagerTest.java} |  20 +--
 26 files changed, 687 insertions(+), 286 deletions(-)

diff --git a/README.md b/README.md
index fb2c6c6..e32c455 100644
--- a/README.md
+++ b/README.md
@@ -10,3 +10,25 @@ and then init a mongo replicaSet
 
 `docker exec -it mongo-test mongo ` and `rs.initiate()` and then you can run all junit test
 
+
+
+## task config params
+
+| param | Description | type |
+| --- | --- | --- |
+| mongoAddr | shardName=replicaSetName/127.0.0.1:2781,127.0.0.1:2782,127.0.0.1:2783; | string, split by ; |
+| mongoUserName | mongo root username| string |
+| mongoPassWord | mongo root password| string |
+| interestDbAndCollection | {"dbName":["collection1","collection2"]}, collectionName can be "*" means all collection | json |
+| positionTimeStamp | mongo oplog `bsontimestamp.value`, runtime store position is highest level | int |
+| positionInc | mongo oplog `bsontimestamp.inc`, runtime store position is highest level | int |
+| dataSync | sync all interestDbAndCollection data, runtime store position is highest level | json, Map<String(dbName), List<String(collectionName)>> |
+| serverSelectionTimeoutMS | mongo driver select replicaServer timeout | long | 
+| connectTimeoutMS | mongo driver connect socket timeout | long |
+| socketTimeoutMS | mongo driver read or write timeout | long |
+| ssl or tsl | mongo driver use ssl or tsl | boolean |
+| tlsInsecure or sslInvalidHostNameAllowed | mongo driver when use ssl or tsl allow invalid hostname | boolean|
+| compressors | compressors way | string (zlib or snappy)
+| zlibCompressionLevel | zlib compressors level| int (1-7)|
+| trustStore | ssl pem| path|
+| trustStorePassword | ssl pem decrypt password | string|
diff --git a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
index 3df9dc4..d184b5c 100644
--- a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
+++ b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo;
 
 import io.openmessaging.KeyValue;
@@ -9,22 +26,21 @@ import org.bson.BsonTimestamp;
 
 public class SourceTaskConfig {
 
-    private String replicaSet;
     private String mongoAddr;
     private String mongoUserName;
     private String mongoPassWord;
     private String interestDbAndCollection;
-    private String positionTimeStamp;
-    private String positionInc;
-    private String dataSync;
-    private String serverSelectionTimeoutMS;
-    private String connectTimeoutMS;
-    private String socketTimeoutMS;
-    private String ssl;
-    private String tsl;
-    private String tlsInsecure;
-    private String sslInvalidHostNameAllowed;
-    private String tlsAllowInvalidHostnames;
+    private int positionTimeStamp;
+    private int positionInc;
+    private boolean dataSync;
+    private long serverSelectionTimeoutMS;
+    private long connectTimeoutMS;
+    private long socketTimeoutMS;
+    private boolean ssl;
+    private boolean tsl;
+    private boolean tlsInsecure;
+    private boolean sslInvalidHostNameAllowed;
+    private boolean tlsAllowInvalidHostnames;
     private String compressors;
     private String zlibCompressionLevel;
     private String trustStore;
@@ -37,52 +53,28 @@ public class SourceTaskConfig {
         }
     });
 
-    public String getTrustStore() {
-        return trustStore;
-    }
-
-    public void setTrustStore(String trustStore) {
-        this.trustStore = trustStore;
-    }
-
-    public String getTrustStorePassword() {
-        return trustStorePassword;
-    }
-
-    public void setTrustStorePassword(String trustStorePassword) {
-        this.trustStorePassword = trustStorePassword;
-    }
-
-    public String getZlibCompressionLevel() {
-        return zlibCompressionLevel;
-    }
-
-    public void setZlibCompressionLevel(String zlibCompressionLevel) {
-        this.zlibCompressionLevel = zlibCompressionLevel;
-    }
-
-    public String getPositionInc() {
-        return positionInc;
+    public String getMongoAddr() {
+        return mongoAddr;
     }
 
-    public void setPositionInc(String positionInc) {
-        this.positionInc = positionInc;
+    public void setMongoAddr(String mongoAddr) {
+        this.mongoAddr = mongoAddr;
     }
 
-    public int getCopyThread() {
-        return copyThread;
+    public String getMongoUserName() {
+        return mongoUserName;
     }
 
-    public void setCopyThread(int copyThread) {
-        this.copyThread = copyThread;
+    public void setMongoUserName(String mongoUserName) {
+        this.mongoUserName = mongoUserName;
     }
 
-    public String getPositionTimeStamp() {
-        return positionTimeStamp;
+    public String getMongoPassWord() {
+        return mongoPassWord;
     }
 
-    public void setPositionTimeStamp(String positionTimeStamp) {
-        this.positionTimeStamp = positionTimeStamp;
+    public void setMongoPassWord(String mongoPassWord) {
+        this.mongoPassWord = mongoPassWord;
     }
 
     public String getInterestDbAndCollection() {
@@ -93,107 +85,91 @@ public class SourceTaskConfig {
         this.interestDbAndCollection = interestDbAndCollection;
     }
 
-    public String getMongoAddr() {
-        return mongoAddr;
-    }
-
-    public void setMongoAddr(String mongoAddr) {
-        this.mongoAddr = mongoAddr;
-    }
-
-    public String getMongoUserName() {
-        return mongoUserName;
+    public int getPositionTimeStamp() {
+        return positionTimeStamp;
     }
 
-    public void setMongoUserName(String mongoUserName) {
-        this.mongoUserName = mongoUserName;
+    public void setPositionTimeStamp(int positionTimeStamp) {
+        this.positionTimeStamp = positionTimeStamp;
     }
 
-    public String getMongoPassWord() {
-        return mongoPassWord;
+    public int getPositionInc() {
+        return positionInc;
     }
 
-    public void setMongoPassWord(String mongoPassWord) {
-        this.mongoPassWord = mongoPassWord;
+    public void setPositionInc(int positionInc) {
+        this.positionInc = positionInc;
     }
 
-    public String getDataSync() {
+    public boolean isDataSync() {
         return dataSync;
     }
 
-    public void setDataSync(String dataSync) {
+    public void setDataSync(boolean dataSync) {
         this.dataSync = dataSync;
     }
 
-    public String getReplicaSet() {
-        return replicaSet;
-    }
-
-    public String getServerSelectionTimeoutMS() {
+    public long getServerSelectionTimeoutMS() {
         return serverSelectionTimeoutMS;
     }
 
-    public void setServerSelectionTimeoutMS(String serverSelectionTimeoutMS) {
+    public void setServerSelectionTimeoutMS(long serverSelectionTimeoutMS) {
         this.serverSelectionTimeoutMS = serverSelectionTimeoutMS;
     }
 
-    public void setReplicaSet(String replicaSet) {
-        this.replicaSet = replicaSet;
-    }
-
-    public String getConnectTimeoutMS() {
+    public long getConnectTimeoutMS() {
         return connectTimeoutMS;
     }
 
-    public void setConnectTimeoutMS(String connectTimeoutMS) {
+    public void setConnectTimeoutMS(long connectTimeoutMS) {
         this.connectTimeoutMS = connectTimeoutMS;
     }
 
-    public String getSocketTimeoutMS() {
+    public long getSocketTimeoutMS() {
         return socketTimeoutMS;
     }
 
-    public void setSocketTimeoutMS(String socketTimeoutMS) {
+    public void setSocketTimeoutMS(long socketTimeoutMS) {
         this.socketTimeoutMS = socketTimeoutMS;
     }
 
-    public String getSsl() {
+    public boolean getSsl() {
         return ssl;
     }
 
-    public void setSsl(String ssl) {
+    public void setSsl(boolean ssl) {
         this.ssl = ssl;
     }
 
-    public String getTsl() {
+    public boolean getTsl() {
         return tsl;
     }
 
-    public void setTsl(String tsl) {
+    public void setTsl(boolean tsl) {
         this.tsl = tsl;
     }
 
-    public String getTlsInsecure() {
+    public boolean getTlsInsecure() {
         return tlsInsecure;
     }
 
-    public void setTlsInsecure(String tlsInsecure) {
+    public void setTlsInsecure(boolean tlsInsecure) {
         this.tlsInsecure = tlsInsecure;
     }
 
-    public String getSslInvalidHostNameAllowed() {
+    public boolean getSslInvalidHostNameAllowed() {
         return sslInvalidHostNameAllowed;
     }
 
-    public void setSslInvalidHostNameAllowed(String sslInvalidHostNameAllowed) {
+    public void setSslInvalidHostNameAllowed(boolean sslInvalidHostNameAllowed) {
         this.sslInvalidHostNameAllowed = sslInvalidHostNameAllowed;
     }
 
-    public String getTlsAllowInvalidHostnames() {
+    public boolean getTlsAllowInvalidHostnames() {
         return tlsAllowInvalidHostnames;
     }
 
-    public void setTlsAllowInvalidHostnames(String tlsAllowInvalidHostnames) {
+    public void setTlsAllowInvalidHostnames(boolean tlsAllowInvalidHostnames) {
         this.tlsAllowInvalidHostnames = tlsAllowInvalidHostnames;
     }
 
@@ -205,6 +181,38 @@ public class SourceTaskConfig {
         this.compressors = compressors;
     }
 
+    public String getZlibCompressionLevel() {
+        return zlibCompressionLevel;
+    }
+
+    public void setZlibCompressionLevel(String zlibCompressionLevel) {
+        this.zlibCompressionLevel = zlibCompressionLevel;
+    }
+
+    public String getTrustStore() {
+        return trustStore;
+    }
+
+    public void setTrustStore(String trustStore) {
+        this.trustStore = trustStore;
+    }
+
+    public String getTrustStorePassword() {
+        return trustStorePassword;
+    }
+
+    public void setTrustStorePassword(String trustStorePassword) {
+        this.trustStorePassword = trustStorePassword;
+    }
+
+    public int getCopyThread() {
+        return copyThread;
+    }
+
+    public void setCopyThread(int copyThread) {
+        this.copyThread = copyThread;
+    }
+
     public void load(KeyValue props) {
 
         properties2Object(props, this);
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
index e3dfb6f..5be2e0d 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.connector;
 
 import io.openmessaging.KeyValue;
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index da244cd..49bcf49 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.connector;
 
 import com.alibaba.fastjson.JSONObject;
@@ -7,13 +24,10 @@ import io.openmessaging.connector.api.source.SourceTask;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.connect.mongo.SourceTaskConfig;
-import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.Position;
 import org.apache.connect.mongo.replicator.ReplicaSet;
-import org.apache.connect.mongo.replicator.ReplicaSetConfig;
-import org.apache.connect.mongo.replicator.ReplicaSets;
+import org.apache.connect.mongo.replicator.ReplicaSetManager;
 import org.apache.connect.mongo.replicator.ReplicaSetsContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -24,12 +38,10 @@ public class MongoSourceTask extends SourceTask {
 
     private SourceTaskConfig sourceTaskConfig;
 
-    private ReplicaSets replicaSets;
+    private ReplicaSetManager replicaSetManager;
 
     private ReplicaSetsContext replicaSetsContext;
 
-    private Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
-
     @Override
     public Collection<SourceDataEntry> poll() {
 
@@ -41,24 +53,23 @@ public class MongoSourceTask extends SourceTask {
         try {
             sourceTaskConfig = new SourceTaskConfig();
             sourceTaskConfig.load(config);
+
             replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
-            replicaSets = ReplicaSets.create(sourceTaskConfig.getMongoAddr());
-            replicaSets.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> {
+
+            replicaSetManager = ReplicaSetManager.create(sourceTaskConfig.getMongoAddr());
+
+            replicaSetManager.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> {
                 ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
                     replicaSetName.getBytes()));
                 if (byteBuffer != null && byteBuffer.array().length > 0) {
                     String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8);
-                    ReplicaSetConfig.Position position = JSONObject.parseObject(positionJson, ReplicaSetConfig.Position.class);
+                    Position position = JSONObject.parseObject(positionJson, Position.class);
                     replicaSetConfig.setPosition(position);
                 } else {
-                    ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
-                    position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp() != null
-                        && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches()
-                        ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0);
-                    position.setInc(sourceTaskConfig.getPositionInc() != null
-                        && pattern.matcher(sourceTaskConfig.getPositionInc()).matches()
-                        ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0);
-                    position.setInitSync(StringUtils.equals(sourceTaskConfig.getDataSync(), Constants.INITSYNC) ? true : false);
+                    Position position = new Position();
+                    position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp());
+                    position.setInc(sourceTaskConfig.getPositionInc());
+                    position.setInitSync(sourceTaskConfig.isDataSync());
                     replicaSetConfig.setPosition(position);
                 }
 
diff --git a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
index 87d92ea..1d6dfe5 100644
--- a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
+++ b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.connector.builder;
 
 import com.alibaba.fastjson.JSONObject;
@@ -10,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.Position;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
@@ -17,8 +35,8 @@ import org.bson.BsonTimestamp;
 
 import static org.apache.connect.mongo.replicator.Constants.CREATED;
 import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
-import static org.apache.connect.mongo.replicator.Constants.OBJECTID;
-import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE;
+import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID;
+import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE;
 import static org.apache.connect.mongo.replicator.Constants.PATCH;
 import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
 import static org.apache.connect.mongo.replicator.Constants.VERSION;
@@ -48,12 +66,12 @@ public class MongoDataEntry {
             dataEntryBuilder.timestamp(System.currentTimeMillis())
                 .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
                 .entryType(event.getEntryType());
-            dataEntryBuilder.putFiled(OPERATIONTYPE, event.getOperationType().name());
+            dataEntryBuilder.putFiled(OPERATION_TYPE, event.getOperationType().name());
             dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue());
             dataEntryBuilder.putFiled(VERSION, event.getV());
             dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
             dataEntryBuilder.putFiled(PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : "");
-            dataEntryBuilder.putFiled(OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
+            dataEntryBuilder.putFiled(OBJECT_ID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
         }
 
         String position = createPosition(event, replicaSetConfig);
@@ -64,7 +82,7 @@ public class MongoDataEntry {
     }
 
     private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
-        ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
+        Position position = new Position();
         BsonTimestamp timestamp = event.getTimestamp();
         position.setInc(timestamp != null ? timestamp.getInc() : 0);
         position.setTimeStamp(timestamp != null ? timestamp.getTime() : 0);
@@ -99,7 +117,7 @@ public class MongoDataEntry {
 
     private static void oplogField(Schema schema) {
         schema.setFields(new ArrayList<>());
-        Field op = new Field(0, OPERATIONTYPE, FieldType.STRING);
+        Field op = new Field(0, OPERATION_TYPE, FieldType.STRING);
         schema.getFields().add(op);
         Field time = new Field(1, TIMESTAMP, FieldType.INT64);
         schema.getFields().add(time);
@@ -109,7 +127,7 @@ public class MongoDataEntry {
         schema.getFields().add(namespace);
         Field patch = new Field(4, PATCH, FieldType.STRING);
         schema.getFields().add(patch);
-        Field objectId = new Field(5, OBJECTID, FieldType.STRING);
+        Field objectId = new Field(5, OBJECT_ID, FieldType.STRING);
         schema.getFields().add(objectId);
     }
 
diff --git a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
index 9c73eac..4af5060 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.initsync;
 
 public class CollectionMeta {
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index 6cdbe06..3d68fac 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.initsync;
 
 import com.mongodb.client.MongoClient;
@@ -13,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.connect.mongo.replicator.ReplicaSet;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.ReplicaSetsContext;
-import org.apache.connect.mongo.replicator.event.EventConverter;
+import org.apache.connect.mongo.replicator.event.Document2EventConverter;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 import org.bson.Document;
@@ -115,24 +132,24 @@ public class InitSync {
                     .batchSize(200)
                     .iterator();
                 while (replicaSet.isRuning() && mongoCursor.hasNext()) {
-                    if (context.initSyncAbort()) {
+                    if (context.isInitSyncAbort()) {
                         logger.info("init sync database:{}, collection:{} abort, has copy:{} document", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count);
                         return;
                     }
                     count++;
                     Document document = mongoCursor.next();
-                    ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
+                    ReplicationEvent event = Document2EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
                     event.setOperationType(OperationType.CREATED);
                     event.setNamespace(collectionMeta.getNameSpace());
                     context.publishEvent(event, replicaSetConfig);
                 }
 
             } catch (Exception e) {
-                context.initSyncError();
+                context.setInitSyncError();
+                replicaSet.shutdown();
                 logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace(), e);
             } finally {
                 countDownLatch.countDown();
-                replicaSet.shutdown();
             }
             logger.info("database:{}, collection:{}, copy {} documents, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count);
         }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
index c895bd6..7ba1ac4 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Constants.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.replicator;
 
 public class Constants {
@@ -5,17 +22,15 @@ public class Constants {
     public static final String MONGO_LOCAL_DATABASE = "local";
     public static final String MONGO_OPLOG_RS = "oplog.rs";
 
-    public static final String OPERATIONTYPE = "op";
+    public static final String OPERATION_TYPE = "op";
     public static final String TIMESTAMP = "ts";
     public static final String VERSION = "v";
     public static final String HASH = "h";
     public static final String NAMESPACE = "ns";
     public static final String OPERATION = "o";
-    public static final String OBJECTID = "o2";
+    public static final String OBJECT_ID = "o2";
 
     public static final String CREATED = "created";
     public static final String PATCH = "patch";
 
-    public static final String INITSYNC = "initSync";
-
 }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
index fd26163..a517822 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.replicator;
 
 import com.alibaba.fastjson.JSONObject;
@@ -49,7 +66,7 @@ public class Filter {
             return false;
         };
 
-        notNoopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal();
+        notNoopFilter = (operationType) -> !operationType.equals(OperationType.NOOP);
     }
 
     public boolean filterMeta(CollectionMeta collectionMeta) {
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
index f5e01a3..11bca8f 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.replicator;
 
 import com.mongodb.ConnectionString;
@@ -38,46 +55,46 @@ public class MongoClientFactory {
             sb.append(replicaSetConfig.getReplicaSetName());
         }
 
-        if (StringUtils.isNotBlank(taskConfig.getServerSelectionTimeoutMS())) {
+        if (taskConfig.getServerSelectionTimeoutMS() > 0) {
             sb.append("&");
             sb.append("serverSelectionTimeoutMS=");
             sb.append(taskConfig.getServerSelectionTimeoutMS());
         }
 
-        if (StringUtils.isNotBlank(taskConfig.getConnectTimeoutMS())) {
+        if (taskConfig.getConnectTimeoutMS() > 0) {
             sb.append("&");
             sb.append("connectTimeoutMS=");
             sb.append(taskConfig.getConnectTimeoutMS());
         }
 
-        if (StringUtils.isNotBlank(taskConfig.getSocketTimeoutMS())) {
+        if (taskConfig.getSocketTimeoutMS() > 0) {
             sb.append("&");
             sb.append("socketTimeoutMS=");
             sb.append(taskConfig.getSocketTimeoutMS());
         }
 
-        if (StringUtils.isNotBlank(taskConfig.getSsl()) || StringUtils.isNotBlank(taskConfig.getTsl())) {
+        if (taskConfig.getSsl() || taskConfig.getTsl()) {
             sb.append("&");
             sb.append("ssl=");
             sb.append(true);
         }
 
-        if (StringUtils.isNotBlank(taskConfig.getTlsInsecure())) {
+        if (taskConfig.getTlsInsecure()) {
             sb.append("&");
             sb.append("tlsInsecure=");
-            sb.append(taskConfig.getTlsInsecure());
+            sb.append(true);
         }
 
-        if (StringUtils.isNotBlank(taskConfig.getTlsAllowInvalidHostnames())) {
+        if (taskConfig.getTlsAllowInvalidHostnames()) {
             sb.append("&");
             sb.append("tlsAllowInvalidHostnames=");
-            sb.append(taskConfig.getTlsAllowInvalidHostnames());
+            sb.append(true);
         }
 
-        if (StringUtils.isNotBlank(taskConfig.getSslInvalidHostNameAllowed())) {
+        if (taskConfig.getSslInvalidHostNameAllowed()) {
             sb.append("&");
             sb.append("sslInvalidHostNameAllowed=");
-            sb.append(taskConfig.getSslInvalidHostNameAllowed());
+            sb.append(true);
         }
 
         if (StringUtils.isNotBlank(taskConfig.getCompressors())) {
@@ -105,7 +122,6 @@ public class MongoClientFactory {
         }
 
         logger.info("connection string :{}", sb.toString());
-        System.out.println(sb.toString());
         ConnectionString connectionString = new ConnectionString(sb.toString());
         return MongoClients.create(connectionString);
     }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Position.java b/src/main/java/org/apache/connect/mongo/replicator/Position.java
new file mode 100644
index 0000000..29fd856
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/Position.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.mongo.replicator;
+
+import java.util.Objects;
+import org.bson.BsonTimestamp;
+
+public class Position {
+
+    private int timeStamp;
+    private int inc;
+    private boolean initSync;
+
+    public int getTimeStamp() {
+        return timeStamp;
+    }
+
+    public void setTimeStamp(int timeStamp) {
+        this.timeStamp = timeStamp;
+    }
+
+    public int getInc() {
+        return inc;
+    }
+
+    public void setInc(int inc) {
+        this.inc = inc;
+    }
+
+    public boolean isInitSync() {
+        return initSync;
+    }
+
+    public void setInitSync(boolean initSync) {
+        this.initSync = initSync;
+    }
+
+    public Position() {
+
+    }
+
+    public Position(int timeStamp, int inc, boolean initSync) {
+        this.timeStamp = timeStamp;
+        this.inc = inc;
+        this.initSync = initSync;
+    }
+
+    public boolean isValid() {
+        return timeStamp > 0 && inc > 0;
+    }
+
+    public BsonTimestamp converBsonTimeStamp() {
+        return new BsonTimestamp(timeStamp, inc);
+    }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        Position position = (Position) o;
+        return timeStamp == position.timeStamp &&
+            inc == position.inc &&
+            initSync == position.initSync;
+    }
+
+    @Override public int hashCode() {
+        return Objects.hash(timeStamp, inc, initSync);
+    }
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
index 8f4d0d8..8393316 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.replicator;
 
 import com.mongodb.client.MongoClient;
@@ -38,14 +55,14 @@ public class ReplicaSet {
     }
 
     public void start() {
+        if (!running.compareAndSet(false, true)) {
+            logger.info("the java mongo replica already start");
+            return;
+        }
 
         try {
-            if (!running.compareAndSet(false, true)) {
-                logger.info("the java mongo replica already start");
-                return;
-            }
             this.mongoClient = replicaSetsContext.createMongoClient(replicaSetConfig);
-            this.isReplicaMongo();
+            this.checkReplicaMongo();
             executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext));
         } catch (Exception e) {
             logger.error("start replicator:{} error", replicaSetConfig, e);
@@ -53,16 +70,15 @@ public class ReplicaSet {
         }
     }
 
-    public boolean isReplicaMongo() {
+    public void checkReplicaMongo() {
         MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
         MongoIterable<String> collectionNames = local.listCollectionNames();
         MongoCursor<String> iterator = collectionNames.iterator();
         while (iterator.hasNext()) {
             if (StringUtils.equals(MONGO_OPLOG_RS, iterator.next())) {
-                return true;
+                return;
             }
         }
-        this.shutdown();
         throw new IllegalStateException(String.format("url:%s,  is not replica", replicaSetConfig.getHost()));
     }
 
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
index 1b54b17..ced90b8 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
@@ -1,7 +1,21 @@
-package org.apache.connect.mongo.replicator;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import java.util.Objects;
-import org.bson.BsonTimestamp;
+package org.apache.connect.mongo.replicator;
 
 public class ReplicaSetConfig {
 
@@ -48,10 +62,6 @@ public class ReplicaSetConfig {
         this.host = host;
     }
 
-    public Position emptyPosition() {
-        return new Position(0, 0, true);
-    }
-
     @Override
     public String toString() {
         return "ReplicaSetConfig{" +
@@ -62,73 +72,4 @@ public class ReplicaSetConfig {
             '}';
     }
 
-    public class Position {
-        private int timeStamp;
-        private int inc;
-        private boolean initSync;
-
-        public int getTimeStamp() {
-            return timeStamp;
-        }
-
-        public void setTimeStamp(int timeStamp) {
-            this.timeStamp = timeStamp;
-        }
-
-        public int getInc() {
-            return inc;
-        }
-
-        public void setInc(int inc) {
-            this.inc = inc;
-        }
-
-        public boolean isInitSync() {
-            return initSync;
-        }
-
-        public void setInitSync(boolean initSync) {
-            this.initSync = initSync;
-        }
-
-        public Position(int timeStamp, int inc, boolean initSync) {
-            this.timeStamp = timeStamp;
-            this.inc = inc;
-            this.initSync = initSync;
-        }
-
-        public boolean isValid() {
-            return timeStamp > 0;
-        }
-
-        public BsonTimestamp converBsonTimeStamp() {
-            return new BsonTimestamp(timeStamp, inc);
-        }
-
-        @Override
-        public String toString() {
-            return "Position{" +
-                "timeStamp=" + timeStamp +
-                ", inc=" + inc +
-                ", initSync=" + initSync +
-                '}';
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-            Position position = (Position) o;
-            return timeStamp == position.timeStamp &&
-                inc == position.inc &&
-                initSync == position.initSync;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(timeStamp, inc, initSync);
-        }
-    }
 }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
similarity index 65%
rename from src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
rename to src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
index af1ebeb..c5757d8 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.replicator;
 
 import java.util.HashMap;
@@ -9,13 +26,15 @@ import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.Validate;
 
-public class ReplicaSets {
+public class ReplicaSetManager {
 
     private static final Pattern HOST_PATTERN = Pattern.compile("((([^=]+)[=])?(([^/]+)\\/))?(.+)");
 
+    private static final String HOST_SEPARATOR = ";";
+
     private final Map<String, ReplicaSetConfig> replicaConfigByName = new HashMap<>();
 
-    public ReplicaSets(Set<ReplicaSetConfig> replicaSetConfigs) {
+    public ReplicaSetManager(Set<ReplicaSetConfig> replicaSetConfigs) {
         replicaSetConfigs.forEach(replicaSetConfig -> {
             if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
                 replicaConfigByName.put(replicaSetConfig.getReplicaSetName(), replicaSetConfig);
@@ -25,10 +44,10 @@ public class ReplicaSets {
         validate();
     }
 
-    public static ReplicaSets create(String hosts) {
+    public static ReplicaSetManager create(String hosts) {
         Set<ReplicaSetConfig> replicaSetConfigs = new HashSet<>();
         if (hosts != null) {
-            for (String replicaSetStr : StringUtils.split(hosts.trim(), ";")) {
+            for (String replicaSetStr : StringUtils.split(hosts.trim(), HOST_SEPARATOR)) {
                 if (StringUtils.isNotBlank(replicaSetStr)) {
                     ReplicaSetConfig replicaSetConfig = parseReplicaSetStr(replicaSetStr);
                     if (replicaSetConfig != null) {
@@ -37,7 +56,7 @@ public class ReplicaSets {
                 }
             }
         }
-        return new ReplicaSets(replicaSetConfigs);
+        return new ReplicaSetManager(replicaSetConfigs);
     }
 
     private static ReplicaSetConfig parseReplicaSetStr(String hosts) {
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
index e599f5b..b067256 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
@@ -1,12 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.replicator;
 
 import com.mongodb.client.MongoClient;
 import io.openmessaging.connector.api.data.SourceDataEntry;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.connect.mongo.SourceTaskConfig;
@@ -28,9 +46,12 @@ public class ReplicaSetsContext {
 
     private MongoClientFactory mongoClientFactory;
 
+    private Map<String, Position> lastPositionMap;
+
     public ReplicaSetsContext(SourceTaskConfig taskConfig) {
         this.taskConfig = taskConfig;
-        this.replicaSets = new CopyOnWriteArrayList<>();
+        this.replicaSets = new ArrayList<>();
+        this.lastPositionMap = new HashMap<>();
         this.dataEntryQueue = new LinkedBlockingDeque<>();
         this.filter = new Filter(taskConfig);
         this.mongoClientFactory = new MongoClientFactory(taskConfig);
@@ -91,11 +112,11 @@ public class ReplicaSetsContext {
         return res;
     }
 
-    public boolean initSyncAbort() {
+    public boolean isInitSyncAbort() {
         return initSyncAbort.get();
     }
 
-    public void initSyncError() {
+    public void setInitSyncError() {
         initSyncAbort.set(true);
     }
 
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index 6cb46d1..4c142ce 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.replicator;
 
 import com.mongodb.CursorType;
@@ -7,8 +24,9 @@ import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.Filters;
 import org.apache.connect.mongo.initsync.InitSync;
-import org.apache.connect.mongo.replicator.event.EventConverter;
+import org.apache.connect.mongo.replicator.event.Document2EventConverter;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.bson.BsonTimestamp;
 import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,19 +54,27 @@ public class ReplicatorTask implements Runnable {
     @Override
     public void run() {
 
-        if (replicaSetConfig.getPosition() == null || replicaSetConfig.getPosition().isInitSync()) {
+        BsonTimestamp firstAvailablePosition = findOplogFirstPosition();
+
+        // inValid or
+        // user config dataSync or
+        // user config or runtime saved position lt first oplog position maybe some operation is lost so need dataSync
+        if (!replicaSetConfig.getPosition().isValid() || replicaSetConfig.getPosition().isInitSync()
+            || replicaSetConfig.getPosition().converBsonTimeStamp().compareTo(firstAvailablePosition) < 0) {
+            recordOplogLastPosition();
             InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet);
             initSync.start();
+
         }
 
-        MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
-        FindIterable<Document> iterable;
-        if (replicaSetConfig.getPosition().isValid()) {
-            iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
-                Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
-        } else {
-            iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
+        if (!replicaSet.isRuning() || !replicaSetsContext.isInitSyncAbort()) {
+            return;
         }
+
+        MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
+        FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
+            Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
+
         MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1))
             .noCursorTimeout(true)
             .cursorType(CursorType.TailableAwait)
@@ -70,10 +96,26 @@ public class ReplicatorTask implements Runnable {
         logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig);
     }
 
+    private BsonTimestamp findOplogFirstPosition() {
+        MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
+        FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
+        Document lastOplog = iterable.sort(new Document("$natural", 1)).limit(1).first();
+        BsonTimestamp timestamp = lastOplog.get(Constants.TIMESTAMP, BsonTimestamp.class);
+        return timestamp;
+    }
+
+    private void recordOplogLastPosition() {
+        MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
+        FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
+        Document lastOplog = iterable.sort(new Document("$natural", -1)).limit(1).first();
+        BsonTimestamp timestamp = lastOplog.get(Constants.TIMESTAMP, BsonTimestamp.class);
+        replicaSetConfig.setPosition(new Position(timestamp.getTime(), timestamp.getInc(), false));
+    }
+
     private void executorCursor(MongoCursor<Document> cursor) {
         while (cursor.hasNext() && !replicaSet.isPause()) {
             Document document = cursor.next();
-            ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
+            ReplicationEvent event = Document2EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
             if (replicaSetsContext.filterEvent(event)) {
                 replicaSetsContext.publishEvent(event, replicaSetConfig);
             }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java b/src/main/java/org/apache/connect/mongo/replicator/event/Document2EventConverter.java
similarity index 58%
rename from src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
rename to src/main/java/org/apache/connect/mongo/replicator/event/Document2EventConverter.java
index 1b48990..99ab707 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/Document2EventConverter.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.replicator.event;
 
 import java.util.Optional;
@@ -6,24 +23,24 @@ import org.bson.Document;
 
 import static org.apache.connect.mongo.replicator.Constants.HASH;
 import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
-import static org.apache.connect.mongo.replicator.Constants.OBJECTID;
+import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID;
 import static org.apache.connect.mongo.replicator.Constants.OPERATION;
-import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE;
+import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE;
 import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
 import static org.apache.connect.mongo.replicator.Constants.VERSION;
 
-public class EventConverter {
+public class Document2EventConverter {
 
     public static ReplicationEvent convert(Document document, String replicaSetName) {
 
         ReplicationEvent event = new ReplicationEvent();
-        event.setOperationType(OperationType.getOperationType(document.getString(OPERATIONTYPE)));
+        event.setOperationType(OperationType.getOperationType(document.getString(OPERATION_TYPE)));
         event.setTimestamp(document.get(TIMESTAMP, BsonTimestamp.class));
         event.setH(document.getLong(HASH));
         event.setV(document.getInteger(VERSION));
         event.setNamespace(document.getString(NAMESPACE));
         event.setEventData(Optional.ofNullable(document.get(OPERATION, Document.class)));
-        event.setObjectId(Optional.ofNullable(document.get(OBJECTID, Document.class)));
+        event.setObjectId(Optional.ofNullable(document.get(OBJECT_ID, Document.class)));
         event.setReplicaSetName(replicaSetName);
         event.setDocument(document);
         return event;
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java b/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
index 54df394..b418666 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
@@ -1,28 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.replicator.event;
 
+import org.apache.commons.lang3.StringUtils;
+
 public enum OperationType {
 
     INSERT("i"),
     UPDATE("u"),
     DELETE("d"),
     NOOP("n"),
-    DBCOMMAND("c"),
+    DB_COMMAND("c"),
     CREATED("created"),
     UNKNOWN("unknown");
 
-    private final String operationStr;
+    private final String operation;
 
-    OperationType(String operationStr) {
-        this.operationStr = operationStr;
+    OperationType(String operation) {
+        this.operation = operation;
     }
 
-    public static OperationType getOperationType(String operationStr) {
-        for (OperationType operationType : OperationType.values()) {
-            if (operationType.operationStr.equals(operationStr)) {
-                return operationType;
-            }
+    public static OperationType getOperationType(String operation) {
+
+        if (StringUtils.isEmpty(operation)) {
+            return UNKNOWN;
+        }
+
+        switch (operation) {
+            case "i":
+                return INSERT;
+            case "u":
+                return UPDATE;
+            case "d":
+                return DELETE;
+            case "n":
+                return NOOP;
+            case "c":
+                return DB_COMMAND;
+            case "created":
+                return CREATED;
+            default:
+                return UNKNOWN;
         }
-        return UNKNOWN;
     }
 
 }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
index 6407781..7adca71 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.connect.mongo.replicator.event;
 
 import io.openmessaging.connector.api.data.EntryType;
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
index a804b8b..31c8f4d 100644
--- a/src/test/java/org/apache/connect/mongo/FilterTest.java
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -59,8 +59,9 @@ public class FilterTest {
         ReplicationEvent replicationEvent = new ReplicationEvent();
         replicationEvent.setOperationType(OperationType.NOOP);
         Assert.assertFalse(filter.filterEvent(replicationEvent));
-        replicationEvent.setOperationType(OperationType.DBCOMMAND);
+        replicationEvent.setOperationType(OperationType.DB_COMMAND);
         Assert.assertTrue(filter.filterEvent(replicationEvent));
+
     }
 
 }
diff --git a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
index 93adeeb..0f02064 100644
--- a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
@@ -37,7 +37,7 @@ public class MongoFactoryTest {
 
     @Test
     public void testCreateMongoClientWithSSL() {
-        sourceTaskConfig.setSsl("ssl");
+        sourceTaskConfig.setSsl(true);
         MongoClientSettings settings = getSettings();
         System.out.println(settings.getSslSettings());
         Assert.assertTrue(settings.getSslSettings().isEnabled());
@@ -45,7 +45,7 @@ public class MongoFactoryTest {
 
     @Test
     public void testCreateMongoClientWithTSL() {
-        sourceTaskConfig.setTsl("tsl");
+        sourceTaskConfig.setTsl(true);
         MongoClientSettings settings = getSettings();
         System.out.println(settings.getSslSettings());
         Assert.assertTrue(settings.getSslSettings().isEnabled());
@@ -55,7 +55,7 @@ public class MongoFactoryTest {
     public void testCreateMongoClientWithserverSelectionTimeoutMS() {
         try {
             replicaSetConfig.setReplicaSetName("testReplicatSet");
-            sourceTaskConfig.setServerSelectionTimeoutMS("150");
+            sourceTaskConfig.setServerSelectionTimeoutMS(150);
             System.out.println(getSettings().getClusterSettings());
             Assert.assertTrue(getSettings().getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS) == 150);
         } catch (MongoTimeoutException exception) {
@@ -65,7 +65,7 @@ public class MongoFactoryTest {
 
     @Test
     public void testCreateMongoClientWithConnectTimeoutMS() {
-        sourceTaskConfig.setConnectTimeoutMS("1200");
+        sourceTaskConfig.setConnectTimeoutMS(1200);
         System.out.println(getSettings().getSocketSettings());
         Assert.assertTrue(getSettings().getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS) == 1200);
 
@@ -73,40 +73,40 @@ public class MongoFactoryTest {
 
     @Test
     public void testCreateMongoClientWithSocketTimeoutMS() {
-        sourceTaskConfig.setSocketTimeoutMS("1100");
+        sourceTaskConfig.setSocketTimeoutMS(1100);
         System.out.println(getSettings().getSocketSettings());
         Assert.assertTrue(getSettings().getSocketSettings().getReadTimeout(TimeUnit.MILLISECONDS) == 1100);
     }
 
     @Test
     public void testCreateMongoClientWithInvalidHostNameAllowed() {
-        sourceTaskConfig.setSslInvalidHostNameAllowed("true");
+        sourceTaskConfig.setSslInvalidHostNameAllowed(true);
         System.out.println(getSettings().getSslSettings());
         Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
 
-        sourceTaskConfig.setSslInvalidHostNameAllowed("false");
+        sourceTaskConfig.setSslInvalidHostNameAllowed(false);
         System.out.println(getSettings().getSslSettings());
         Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
     }
 
     @Test
     public void testCreateMongoClientWithInvalidHostNameAllowedTsl() {
-        sourceTaskConfig.setTlsAllowInvalidHostnames("true");
+        sourceTaskConfig.setTlsAllowInvalidHostnames(true);
         System.out.println(getSettings().getSslSettings());
         Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
 
-        sourceTaskConfig.setTlsAllowInvalidHostnames("false");
+        sourceTaskConfig.setTlsAllowInvalidHostnames(false);
         System.out.println(getSettings().getSslSettings());
         Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
     }
 
     @Test
     public void testCreateMongoClientWithTlsinsecure() {
-        sourceTaskConfig.setTlsInsecure("true");
+        sourceTaskConfig.setTlsInsecure(true);
         System.out.println(getSettings().getSslSettings());
         Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
 
-        sourceTaskConfig.setTlsInsecure("false");
+        sourceTaskConfig.setTlsInsecure(false);
         System.out.println(getSettings().getSslSettings());
         Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
     }
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
index f85e4a9..cc02fbc 100644
--- a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
@@ -12,6 +12,7 @@ import java.util.Optional;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.connect.mongo.connector.MongoSourceConnector;
 import org.apache.connect.mongo.connector.MongoSourceTask;
+import org.apache.connect.mongo.replicator.Position;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.ReplicaSetsContext;
 import org.apache.connect.mongo.replicator.event.OperationType;
@@ -73,7 +74,7 @@ public class MongoSourceConnectorTest {
         Assert.assertEquals("testReplicaName", new String(sourcePartition.array()));
 
         ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
-        ReplicaSetConfig.Position position = JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class);
+        Position position = JSONObject.parseObject(new String(sourcePosition.array()), Position.class);
         Assert.assertEquals(position.getTimeStamp(), 1565609506);
         Assert.assertEquals(position.getInc(), 1);
         Assert.assertEquals(position.isInitSync(), false);
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
index b696393..4983a66 100644
--- a/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
@@ -14,7 +14,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.connect.mongo.connector.MongoSourceTask;
-import org.apache.connect.mongo.replicator.Constants;
 import org.apache.connect.mongo.replicator.ReplicaSet;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.ReplicaSetsContext;
@@ -31,7 +30,7 @@ public class MongoSourceTaskTest {
         defaultKeyValue.put("positionTimeStamp", "11111111");
         defaultKeyValue.put("positionInc", "111");
         defaultKeyValue.put("serverSelectionTimeoutMS", "10");
-        defaultKeyValue.put("dataSync", Constants.INITSYNC);
+        defaultKeyValue.put("dataSync", "true");
 
         Field context = SourceTask.class.getDeclaredField("context");
         context.setAccessible(true);
diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java
index 98e9a42..3d900fa 100644
--- a/src/test/java/org/apache/connect/mongo/MongoTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -19,10 +19,11 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.connect.mongo.initsync.InitSync;
 import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.Position;
 import org.apache.connect.mongo.replicator.ReplicaSet;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.ReplicaSetsContext;
-import org.apache.connect.mongo.replicator.event.EventConverter;
+import org.apache.connect.mongo.replicator.event.Document2EventConverter;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 import org.bson.BsonTimestamp;
@@ -49,11 +50,11 @@ public class MongoTest {
         oplog.put(Constants.TIMESTAMP, timestamp);
         oplog.put(Constants.NAMESPACE, "test.person");
         oplog.put(Constants.HASH, 11111L);
-        oplog.put(Constants.OPERATIONTYPE, "i");
+        oplog.put(Constants.OPERATION_TYPE, "i");
         Document document = new Document();
         document.put("test", "test");
         oplog.put(Constants.OPERATION, document);
-        ReplicationEvent event = EventConverter.convert(oplog, "testR");
+        ReplicationEvent event = Document2EventConverter.convert(oplog, "testR");
         Assert.assertEquals(timestamp, event.getTimestamp());
         Assert.assertEquals("test.person", event.getNamespace());
         Assert.assertTrue(11111L == event.getH());
@@ -95,16 +96,16 @@ public class MongoTest {
         int syncCount = 0;
         while (syncCount < count) {
             Collection<SourceDataEntry> sourceDataEntries = replicaSetsContext.poll();
-            Assert.assertNotNull(sourceDataEntries);
+            Assert.assertTrue(sourceDataEntries.size() > 0);
             for (SourceDataEntry sourceDataEntry : sourceDataEntries) {
                 ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
                 Assert.assertEquals("test", new String(sourcePartition.array()));
                 ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
-                ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
+                Position position = new Position();
                 position.setInitSync(true);
                 position.setTimeStamp(0);
                 position.setInc(0);
-                Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class));
+                Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), Position.class));
                 EntryType entryType = sourceDataEntry.getEntryType();
                 Assert.assertEquals(EntryType.CREATE, entryType);
                 String queueName = sourceDataEntry.getQueueName();
@@ -122,4 +123,16 @@ public class MongoTest {
 
         Assert.assertTrue(syncCount == count);
     }
+
+    @Test
+    public void testCompareBsonTimestamp() {
+        BsonTimestamp lt = new BsonTimestamp(11111111, 1);
+        BsonTimestamp gt = new BsonTimestamp(11111111, 2);
+        Assert.assertTrue(lt.compareTo(gt) < 0);
+
+        lt = new BsonTimestamp(11111111, 1);
+        gt = new BsonTimestamp(22222222, 1);
+        Assert.assertTrue(lt.compareTo(gt) < 0);
+
+    }
 }
diff --git a/src/test/java/org/apache/connect/mongo/OperationTypeTest.java b/src/test/java/org/apache/connect/mongo/OperationTypeTest.java
new file mode 100644
index 0000000..d8c5a9b
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/OperationTypeTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.mongo;
+
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OperationTypeTest {
+
+    @Test
+    public void testGetOperationType() {
+        Assert.assertEquals(OperationType.INSERT, OperationType.getOperationType("i"));
+        Assert.assertEquals(OperationType.UPDATE, OperationType.getOperationType("u"));
+        Assert.assertEquals(OperationType.DELETE, OperationType.getOperationType("d"));
+        Assert.assertEquals(OperationType.NOOP, OperationType.getOperationType("n"));
+        Assert.assertEquals(OperationType.DB_COMMAND, OperationType.getOperationType("c"));
+        Assert.assertEquals(OperationType.CREATED, OperationType.getOperationType("created"));
+        Assert.assertEquals(OperationType.UNKNOWN, OperationType.getOperationType("test"));
+
+    }
+}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetManagerTest.java
similarity index 75%
rename from src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
rename to src/test/java/org/apache/connect/mongo/ReplicaSetManagerTest.java
index 5276f4f..1d3b743 100644
--- a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
+++ b/src/test/java/org/apache/connect/mongo/ReplicaSetManagerTest.java
@@ -2,26 +2,26 @@ package org.apache.connect.mongo;
 
 import java.util.Map;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
-import org.apache.connect.mongo.replicator.ReplicaSets;
+import org.apache.connect.mongo.replicator.ReplicaSetManager;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class ReplicaSetsTest {
+public class ReplicaSetManagerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testCreatReplicaSetsExceptionWithoutMongoAddr() {
-        ReplicaSets.create("");
+        ReplicaSetManager.create("");
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testCreatReplicaSetsExceptioWithoutReplicaSetName() {
-        ReplicaSets.create("127.0.0.1:27081");
+        ReplicaSetManager.create("127.0.0.1:27081");
     }
 
     @Test
     public void testCreatReplicaSetsSpecialReplicaSetName() {
-        ReplicaSets replicaSets = ReplicaSets.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
-        Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+        ReplicaSetManager replicaSetManager = ReplicaSetManager.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
+        Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSetManager.getReplicaConfigByName();
         Assert.assertTrue(replicaSetConfigMap.size() == 1);
         Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
         Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
@@ -30,8 +30,8 @@ public class ReplicaSetsTest {
 
     @Test
     public void testCreatReplicaSetsSpecialShardNameAndReplicaSetName() {
-        ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
-        Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+        ReplicaSetManager replicaSetManager = ReplicaSetManager.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
+        Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSetManager.getReplicaConfigByName();
         Assert.assertTrue(replicaSetConfigMap.size() == 1);
         Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
         Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
@@ -41,8 +41,8 @@ public class ReplicaSetsTest {
 
     @Test
     public void testCreatReplicaSetsMutiMongoAddr() {
-        ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283");
-        Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+        ReplicaSetManager replicaSetManager = ReplicaSetManager.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283");
+        Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSetManager.getReplicaConfigByName();
         Assert.assertTrue(replicaSetConfigMap.size() == 2);
         Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
         Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());