You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:27 UTC

[rocketmq-connect] 04/13: fix some bug

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 32d316512cfb0e31d9255235695cbead65799891
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Thu Aug 8 16:18:41 2019 +0800

    fix some bug
---
 .../connect/mongo/MongoReplicatorConfig.java       |  45 +++++----
 .../connect/mongo/connector/MongoSourceTask.java   |  14 ++-
 .../apache/connect/mongo/initsync/InitSync.java    |  38 +++-----
 .../apache/connect/mongo/replicator/Filter.java    |  80 ++++++++--------
 .../connect/mongo/replicator/MongoReplicator.java  |  23 +++--
 .../replicator/event/DocumentConvertEvent.java     |  24 ++---
 .../java/org/apache/connect/mongo/FilterTest.java  |  42 +++++----
 .../java/org/apache/connect/mongo/MongoTest.java   | 105 +++++++++++++++++++++
 .../org/apache/connect/mongo/ReplicatorTest.java   |  33 -------
 9 files changed, 228 insertions(+), 176 deletions(-)

diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
index 18a834f..9f17aab 100644
--- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
+++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
@@ -1,6 +1,7 @@
 package org.apache.connect.mongo;
 
 import io.openmessaging.KeyValue;
+import org.apache.commons.lang3.StringUtils;
 
 import java.lang.reflect.Method;
 import java.util.HashSet;
@@ -8,12 +9,11 @@ import java.util.Set;
 
 public class MongoReplicatorConfig {
 
+    private String replicaSet;
     private String mongoAddr;
-    private int mongoPort;
     private String mongoUserName;
     private String mongoPassWord;
-    private String interestDB;
-    private String interestCollection;
+    private String interestDbAndCollection;
     private long positionTimeStamp;
     private int positionInc;
     private String dataSync;
@@ -23,7 +23,6 @@ public class MongoReplicatorConfig {
     public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
         {
             add("mongoAddr");
-            add("mongoPort");
         }
     };
 
@@ -51,20 +50,12 @@ public class MongoReplicatorConfig {
         this.positionTimeStamp = positionTimeStamp;
     }
 
-    public String getInterestDB() {
-        return interestDB;
+    public String getInterestDbAndCollection() {
+        return interestDbAndCollection;
     }
 
-    public void setInterestDB(String interestDB) {
-        this.interestDB = interestDB;
-    }
-
-    public String getInterestCollection() {
-        return interestCollection;
-    }
-
-    public void setInterestCollection(String interestCollection) {
-        this.interestCollection = interestCollection;
+    public void setInterestDbAndCollection(String interestDbAndCollection) {
+        this.interestDbAndCollection = interestDbAndCollection;
     }
 
     public String getMongoAddr() {
@@ -75,13 +66,6 @@ public class MongoReplicatorConfig {
         this.mongoAddr = mongoAddr;
     }
 
-    public int getMongoPort() {
-        return mongoPort;
-    }
-
-    public void setMongoPort(int mongoPort) {
-        this.mongoPort = mongoPort;
-    }
 
     public String getMongoUserName() {
         return mongoUserName;
@@ -109,6 +93,14 @@ public class MongoReplicatorConfig {
     }
 
 
+    public String getReplicaSet() {
+        return replicaSet;
+    }
+
+    public void setReplicaSet(String replicaSet) {
+        this.replicaSet = replicaSet;
+    }
+
     public void load(KeyValue props) {
 
         properties2Object(props, this);
@@ -154,4 +146,11 @@ public class MongoReplicatorConfig {
             }
         }
     }
+
+    public String getDataSouce() {
+        if (StringUtils.isBlank(replicaSet)) {
+            return mongoAddr;
+        }
+        return replicaSet + ":" + mongoAddr;
+    }
 }
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index 40dfdd0..9176ab7 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -44,7 +44,7 @@ public class MongoSourceTask extends SourceTask {
         buildFieleds(schema);
         DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
         dataEntryBuilder.timestamp(System.currentTimeMillis())
-                .queue(event.getNamespace())
+                .queue(event.getNamespace().replace(".", "-"))
                 .entryType(event.getEntryType());
 
         if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) {
@@ -71,9 +71,7 @@ public class MongoSourceTask extends SourceTask {
             replicatorConfig = new MongoReplicatorConfig();
             replicatorConfig.load(config);
             mongoReplicator = new MongoReplicator(replicatorConfig);
-            mongoSource = new StringBuilder()
-                    .append(replicatorConfig.getMongoAddr())
-                    .append(replicatorConfig.getMongoPort()).toString();
+            mongoSource = replicatorConfig.getDataSouce();
             ByteBuffer position = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
                     mongoSource.getBytes()));
 
@@ -121,7 +119,7 @@ public class MongoSourceTask extends SourceTask {
         schema.getFields().add(operation);
         Field patch = new Field(5, Constants.PATCH, FieldType.STRING);
         schema.getFields().add(patch);
-        Field objectId = new Field(5, Constants.OBJECTID, FieldType.STRING);
+        Field objectId = new Field(6, Constants.OBJECTID, FieldType.STRING);
         schema.getFields().add(objectId);
     }
 
@@ -134,9 +132,9 @@ public class MongoSourceTask extends SourceTask {
                 jsonObject.put(Constants.INITSYNC, true);
                 break;
             default:
-                jsonObject.put(Constants.POSITION_TIMESTAMP, 0);
-                jsonObject.put(Constants.POSITION_INC, 0);
-                jsonObject.put(Constants.INITSYNC, true);
+                jsonObject.put(Constants.POSITION_TIMESTAMP, event.getTimestamp().getTime());
+                jsonObject.put(Constants.POSITION_INC, event.getTimestamp().getInc());
+                jsonObject.put(Constants.INITSYNC, false);
                 break;
         }
         return jsonObject;
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index 4b3a238..e12412b 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -27,7 +27,6 @@ public class InitSync {
     private MongoClient mongoClient;
     private Filter filter;
     private int copyThreadCount;
-    private Set<String> interestDataBases;
     private Set<CollectionMeta> interestCollections;
     private CountDownLatch countDownLatch;
     private MongoReplicator mongoReplicator;
@@ -53,8 +52,7 @@ public class InitSync {
     }
 
     private void init() {
-        interestDataBases = getInterestDataBase();
-        interestCollections = getInterestCollection(interestDataBases);
+        interestCollections = getInterestCollection();
         copyThreadCount = Math.min(interestCollections.size(), mongoReplicatorConfig.getCopyThread());
         copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() {
 
@@ -68,15 +66,17 @@ public class InitSync {
         countDownLatch = new CountDownLatch(interestCollections.size());
     }
 
-    private Set<CollectionMeta> getInterestCollection(Set<String> interestDataBases) {
+    private Set<CollectionMeta> getInterestCollection() {
         Set<CollectionMeta> res = new HashSet<>();
-        for (String interestDataBase : interestDataBases) {
-            MongoIterable<String> collectionNames = mongoClient.getDatabase(interestDataBase).listCollectionNames();
-            MongoCursor<String> iterator = collectionNames.iterator();
-            while (iterator.hasNext()) {
-                String collectionName = iterator.next();
-                if (filter.filterCollectionName(collectionName)) {
-                    CollectionMeta collectionMeta = new CollectionMeta(interestDataBase, collectionName);
+        MongoIterable<String> databaseNames = mongoClient.listDatabaseNames();
+        MongoCursor<String> dbIterator = databaseNames.iterator();
+        while (dbIterator.hasNext()) {
+            String dataBaseName = dbIterator.next();
+            MongoCursor<String> collIterator = mongoClient.getDatabase(dataBaseName).listCollectionNames().iterator();
+            while (collIterator.hasNext()) {
+                String collectionName = collIterator.next();
+                CollectionMeta collectionMeta = new CollectionMeta(dataBaseName, collectionName);
+                if (filter.filter(collectionMeta)) {
                     res.add(collectionMeta);
                 }
             }
@@ -86,19 +86,6 @@ public class InitSync {
 
     }
 
-    private Set<String> getInterestDataBase() {
-        Set<String> res = new HashSet<>();
-        MongoIterable<String> databaseNames = mongoClient.listDatabaseNames();
-        MongoCursor<String> iterator = databaseNames.iterator();
-        while (iterator.hasNext()) {
-            String dataBaseName = iterator.next();
-            if (filter.filterDatabaseName(dataBaseName)) {
-                res.add(dataBaseName);
-            }
-        }
-
-        return res;
-    }
 
     class CopyRunner implements Runnable {
 
@@ -132,6 +119,9 @@ public class InitSync {
                     event.setNamespace(collectionMeta.getNameSpace());
                     mongoReplicator.publishEvent(event);
                 }
+
+            } catch (Exception e) {
+                logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace());
             } finally {
                 countDownLatch.countDown();
             }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
index 05dec54..4a62e41 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
@@ -1,74 +1,68 @@
 package org.apache.connect.mongo.replicator;
 
 
-import org.apache.commons.lang3.ArrayUtils;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.initsync.CollectionMeta;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 
 public class Filter {
 
-    private Function<String, Boolean> dbFilter;
-    private Function<String, Boolean> collectionFilter;
-    private Function<OperationType, Boolean> noopFilter;
+    private Function<CollectionMeta, Boolean> dbAndCollectionFilter;
+    private Map<String, List<String>> interestMap = new HashMap<>();
+    private Function<OperationType, Boolean> notNoopFilter;
 
 
     public Filter(MongoReplicatorConfig mongoReplicatorConfig) {
-        if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestDB())) {
-            dbFilter = (dataBaseName) -> {
-                if (StringUtils.isBlank(dataBaseName)) {
-                    return true;
-                }
-                String interestDB = mongoReplicatorConfig.getInterestDB();
-                String[] db = StringUtils.split(interestDB, ",");
-                if (ArrayUtils.contains(db, dataBaseName)) {
-                    return true;
-                }
 
-                return false;
-            };
-        } else {
-            dbFilter = (dataBaseName) -> true;
-        }
+        String interestDbAndCollection = mongoReplicatorConfig.getInterestDbAndCollection();
 
+        if (StringUtils.isNotBlank(interestDbAndCollection)) {
+            JSONObject jsonObject = JSONObject.parseObject(interestDbAndCollection);
+            for (String db : jsonObject.keySet()) {
+                List<String> collections = jsonObject.getObject(db, new TypeReference<List<String>>() {
+                });
+                interestMap.put(db, collections);
+            }
 
-        if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestCollection())) {
-            collectionFilter = (collectionName) -> {
-                if (StringUtils.isBlank(collectionName)) {
-                    return true;
-                }
 
-                String interestCollection = mongoReplicatorConfig.getInterestCollection();
-                String[] coll = StringUtils.split(interestCollection, ",");
-                if (ArrayUtils.contains(coll, collectionName)) {
-                    return true;
-                }
-                return false;
-            };
-        } else {
-            collectionFilter = (collectionName) -> true;
         }
 
-        noopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal();
-    }
+        dbAndCollectionFilter = (collectionMeta) -> {
+            if (interestMap.size() == 0) {
+                return true;
+            }
+            List<String> collections = interestMap.get(collectionMeta.getDatabaseName());
 
+            if (collections == null || collections.size() == 0) {
+                return false;
+            }
+
+            if (collections.contains("*") || collections.contains(collectionMeta.getCollectionName())) {
+                return true;
+            }
 
-    public boolean filterDatabaseName(String dataBaseName) {
-        return dbFilter.apply(dataBaseName);
+            return false;
+        };
+
+        notNoopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal();
     }
 
 
-    public boolean filterCollectionName(String collectionName) {
-        return collectionFilter.apply(collectionName);
+    public boolean filter(CollectionMeta collectionMeta) {
+        return dbAndCollectionFilter.apply(collectionMeta);
     }
 
     public boolean filterEvent(ReplicationEvent event) {
-        return dbFilter.apply(event.getDatabaseName())
-                && collectionFilter.apply(event.getCollectionName())
-                && noopFilter.apply(event.getOperationType());
+        return dbAndCollectionFilter.apply(new CollectionMeta(event.getDatabaseName(), event.getCollectionName()))
+                && notNoopFilter.apply(event.getOperationType());
     }
-
 }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
index a14ceee..0f8520b 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
@@ -54,7 +54,8 @@ public class MongoReplicator {
                 return;
             }
 
-            this.clientSettings = MongoClientSettings.builder().applicationName(APPLICATION_NAME)
+            this.clientSettings = MongoClientSettings.builder()
+                    .applicationName(APPLICATION_NAME)
                     .applyConnectionString(connectionString)
                     .build();
             this.mongoClient = MongoClients.create(clientSettings);
@@ -68,7 +69,6 @@ public class MongoReplicator {
 
 
     private void buildConnectionString() {
-        checkConfig();
         StringBuilder sb = new StringBuilder();
         sb.append("mongodb://");
         if (StringUtils.isNotBlank(mongoReplicatorConfig.getMongoUserName())
@@ -80,19 +80,17 @@ public class MongoReplicator {
 
         }
         sb.append(mongoReplicatorConfig.getMongoAddr());
-        sb.append(":");
-        sb.append(mongoReplicatorConfig.getMongoPort());
-
+        sb.append("/");
+        if (StringUtils.isBlank(mongoReplicatorConfig.getReplicaSet())) {
+            sb.append("?");
+            sb.append("replicaSet=");
+            sb.append(mongoReplicatorConfig.getReplicaSet());
+        }
         this.connectionString = new ConnectionString(sb.toString());
     }
 
-    private void checkConfig() {
-        Validate.notBlank(mongoReplicatorConfig.getMongoAddr(), "mongo url is blank");
-        Validate.isTrue(mongoReplicatorConfig.getMongoPort() > 0 && mongoReplicatorConfig.getMongoPort() < 65535, "mongo port should >0 and <65535");
 
-    }
-
-    private boolean isReplicaMongo() {
+    public boolean isReplicaMongo() {
         MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
         MongoIterable<String> collectionNames = local.listCollectionNames();
         for (String collectionName : collectionNames) {
@@ -101,7 +99,7 @@ public class MongoReplicator {
             }
         }
         this.shutdown();
-        throw new IllegalStateException(String.format("url:%s, port:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getMongoPort()));
+        throw new IllegalStateException(String.format("url:%s, set:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getReplicaSet()));
     }
 
     public void shutdown() {
@@ -127,6 +125,7 @@ public class MongoReplicator {
     }
 
 
+
     public void pause() {
         pause = true;
     }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
index 6b902e7..a18aa52 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
@@ -10,26 +10,20 @@ import static org.apache.connect.mongo.replicator.Constants.*;
 
 public class DocumentConvertEvent {
 
+
     public static ReplicationEvent convert(Document document) {
-        ReplicationEvent event = null;
-        try {
 
-            OperationType operationType = OperationType.getOperationType(document.getString(OPERATIONTYPE));
-            BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP);
+        OperationType operationType = OperationType.getOperationType(document.getString(OPERATIONTYPE));
+        BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP);
 //                Long t = document.getLong("t");
-            Long h = document.getLong(HASH);
-            Integer v = document.getInteger(VERSION);
-            String nameSpace = document.getString(NAMESPACE);
+        Long h = document.getLong(HASH);
+        Integer v = document.getInteger(VERSION);
+        String nameSpace = document.getString(NAMESPACE);
 //                String uuid = document.getString("uuid");
 //                Date wall = document.getDate("wall");
-            Document operation = document.get(OPERATION, Document.class);
-            Document objectID = document.get(OBJECTID, Document.class);
-            event = new ReplicationEvent(operationType, timestamp, v, h, nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), document);
-        } catch (Exception e) {
-            System.out.println(e);
-        }
-
-        return event;
+        Document operation = document.get(OPERATION, Document.class);
+        Document objectID = document.get(OBJECTID, Document.class);
+        return new ReplicationEvent(operationType, timestamp, v, h, nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), document);
     }
 
 }
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
index 24f36d9..2b14b13 100644
--- a/src/test/java/org/apache/connect/mongo/FilterTest.java
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -1,5 +1,7 @@
 package org.apache.connect.mongo;
 
+import com.alibaba.fastjson.JSONObject;
+import org.apache.connect.mongo.initsync.CollectionMeta;
 import org.apache.connect.mongo.replicator.Filter;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
@@ -7,51 +9,55 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class FilterTest {
 
 
     private MongoReplicatorConfig config;
-
+    private Map<String, List<String>> insterest;
 
     @Before
     public void init() {
         config = new MongoReplicatorConfig();
-
+        insterest = new HashMap<>();
     }
 
     @Test
     public void testSpecialDb() {
-        config.setInterestDB("test,admin");
+        List<String> collections = new ArrayList<>();
+        collections.add("person");
+        insterest.put("test", collections);
+        config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
         Filter filter = new Filter(config);
-        Assert.assertTrue(filter.filterDatabaseName("test"));
-        Assert.assertFalse(filter.filterDatabaseName("test01"));
+        Assert.assertTrue(filter.filter(new CollectionMeta("test", "person")));
+        Assert.assertFalse(filter.filter(new CollectionMeta("test", "person01")));
     }
 
 
     @Test
     public void testBlankDb() {
         Filter filter = new Filter(config);
-        Assert.assertTrue(filter.filterDatabaseName("test"));
-        Assert.assertTrue(filter.filterDatabaseName("test01"));
+        Assert.assertTrue(filter.filter(new CollectionMeta("test" ,"test")));
+        Assert.assertTrue(filter.filter(new CollectionMeta("test1" ,"test01")));
     }
 
 
     @Test
-    public void testSpecialCollection() {
-        config.setInterestCollection("test,admin");
+    public void testAsterisk() {
+        List<String> collections = new ArrayList<>();
+        collections.add("*");
+        insterest.put("test", collections);
+        config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
         Filter filter = new Filter(config);
-        Assert.assertTrue(filter.filterCollectionName("test"));
-        Assert.assertFalse(filter.filterCollectionName("test01"));
+        Assert.assertTrue(filter.filter(new CollectionMeta("test", "testsad")));
+        Assert.assertTrue(filter.filter(new CollectionMeta("test", "tests032")));
     }
 
 
-    @Test
-    public void testBlankCollection() {
-        Filter filter = new Filter(config);
-        Assert.assertTrue(filter.filterCollectionName("test"));
-        Assert.assertTrue(filter.filterCollectionName("test01"));
-    }
-
 
     @Test
     public void testFilterEvent() {
diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java
new file mode 100644
index 0000000..849c00c
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -0,0 +1,105 @@
+package org.apache.connect.mongo;
+
+import com.alibaba.fastjson.JSONObject;
+import com.mongodb.ConnectionString;
+import com.mongodb.CursorType;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.*;
+import com.mongodb.client.model.Filters;
+import io.openmessaging.connector.api.data.EntryType;
+import org.apache.connect.mongo.initsync.InitSync;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.Filter;
+import org.apache.connect.mongo.replicator.MongoReplicator;
+import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MongoTest {
+
+    private MongoClient mongoClient;
+
+    @Before
+    public void before() {
+        MongoClientSettings.Builder builder = MongoClientSettings.builder();
+        builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077"));
+        mongoClient = MongoClients.create(builder.build());
+    }
+
+
+    @Test
+    public void testConvertEvent() {
+        Document oplog = new Document();
+        BsonTimestamp timestamp = new BsonTimestamp(1565074665, 10);
+        oplog.put(Constants.TIMESTAMP, timestamp);
+        oplog.put(Constants.NAMESPACE, "test.person");
+        oplog.put(Constants.HASH, 11111L);
+        oplog.put(Constants.OPERATIONTYPE, "i");
+        Document document = new Document();
+        document.put("test", "test");
+        oplog.put(Constants.OPERATION, document);
+        ReplicationEvent event = DocumentConvertEvent.convert(oplog);
+        Assert.assertEquals(timestamp, event.getTimestamp());
+        Assert.assertEquals("test.person", event.getNamespace());
+        Assert.assertTrue(11111L == event.getH());
+        Assert.assertEquals(OperationType.INSERT, event.getOperationType());
+        Assert.assertEquals(EntryType.CREATE, event.getEntryType());
+        Assert.assertEquals(document, event.getEventData().get());
+
+
+    }
+
+
+    @Test
+    public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
+        MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person");
+        collection.deleteMany(new Document());
+        int count = 100;
+        List<Document> documents = new ArrayList<>(count);
+        for (int i = 0; i < count; i++) {
+            Document document = new Document();
+            document.put("name", "test" + i);
+            document.put("age", i);
+            document.put("sex", i % 2 == 0 ? "boy" : "girl");
+            collection.insertOne(document);
+            documents.add(document);
+        }
+        MongoReplicatorConfig config = new MongoReplicatorConfig();
+        Map<String, List<String>> insterest = new HashMap<>();
+        List<String> collections = new ArrayList<>();
+        collections.add("*");
+        insterest.put("test", collections);
+        config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
+        MongoReplicator mongoReplicator = new MongoReplicator(config);
+        Field running = MongoReplicator.class.getDeclaredField("running");
+        running.setAccessible(true);
+        running.set(mongoReplicator, new AtomicBoolean(true));
+        InitSync initSync = new InitSync(config, mongoClient, new Filter(config), mongoReplicator);
+        initSync.start();
+        BlockingQueue<ReplicationEvent> queue = mongoReplicator.getQueue();
+        while (count > 0) {
+            count--;
+            ReplicationEvent event = queue.poll(100, TimeUnit.MILLISECONDS);
+            Assert.assertTrue(event.getOperationType().equals(OperationType.CREATED));
+            Assert.assertNotNull(event.getDocument());
+            Assert.assertTrue(documents.contains(event.getDocument()));
+        }
+
+
+
+    }
+}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicatorTest.java b/src/test/java/org/apache/connect/mongo/ReplicatorTest.java
deleted file mode 100644
index b0319f1..0000000
--- a/src/test/java/org/apache/connect/mongo/ReplicatorTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.connect.mongo;
-
-import org.apache.connect.mongo.replicator.MongoReplicator;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ReplicatorTest {
-
-    private MongoReplicatorConfig config;
-
-    @Before
-    public void before() {
-        config = new MongoReplicatorConfig();
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoPort() {
-        config.setMongoAddr("127.0.0.1");
-        MongoReplicator mongoReplicator = new MongoReplicator(config);
-        mongoReplicator.start();
-    }
-
-
-    @Test
-    public void testNoPort1() {
-        config.setMongoAddr("127.0.0.1");
-        config.setMongoPort(27012);
-        MongoReplicator mongoReplicator = new MongoReplicator(config);
-        mongoReplicator.start();
-    }
-
-
-}