You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:27 UTC
[rocketmq-connect] 04/13: fix some bug
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 32d316512cfb0e31d9255235695cbead65799891
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Thu Aug 8 16:18:41 2019 +0800
fix some bug
---
.../connect/mongo/MongoReplicatorConfig.java | 45 +++++----
.../connect/mongo/connector/MongoSourceTask.java | 14 ++-
.../apache/connect/mongo/initsync/InitSync.java | 38 +++-----
.../apache/connect/mongo/replicator/Filter.java | 80 ++++++++--------
.../connect/mongo/replicator/MongoReplicator.java | 23 +++--
.../replicator/event/DocumentConvertEvent.java | 24 ++---
.../java/org/apache/connect/mongo/FilterTest.java | 42 +++++----
.../java/org/apache/connect/mongo/MongoTest.java | 105 +++++++++++++++++++++
.../org/apache/connect/mongo/ReplicatorTest.java | 33 -------
9 files changed, 228 insertions(+), 176 deletions(-)
diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
index 18a834f..9f17aab 100644
--- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
+++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
@@ -1,6 +1,7 @@
package org.apache.connect.mongo;
import io.openmessaging.KeyValue;
+import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Method;
import java.util.HashSet;
@@ -8,12 +9,11 @@ import java.util.Set;
public class MongoReplicatorConfig {
+ private String replicaSet;
private String mongoAddr;
- private int mongoPort;
private String mongoUserName;
private String mongoPassWord;
- private String interestDB;
- private String interestCollection;
+ private String interestDbAndCollection;
private long positionTimeStamp;
private int positionInc;
private String dataSync;
@@ -23,7 +23,6 @@ public class MongoReplicatorConfig {
public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
{
add("mongoAddr");
- add("mongoPort");
}
};
@@ -51,20 +50,12 @@ public class MongoReplicatorConfig {
this.positionTimeStamp = positionTimeStamp;
}
- public String getInterestDB() {
- return interestDB;
+ public String getInterestDbAndCollection() {
+ return interestDbAndCollection;
}
- public void setInterestDB(String interestDB) {
- this.interestDB = interestDB;
- }
-
- public String getInterestCollection() {
- return interestCollection;
- }
-
- public void setInterestCollection(String interestCollection) {
- this.interestCollection = interestCollection;
+ public void setInterestDbAndCollection(String interestDbAndCollection) {
+ this.interestDbAndCollection = interestDbAndCollection;
}
public String getMongoAddr() {
@@ -75,13 +66,6 @@ public class MongoReplicatorConfig {
this.mongoAddr = mongoAddr;
}
- public int getMongoPort() {
- return mongoPort;
- }
-
- public void setMongoPort(int mongoPort) {
- this.mongoPort = mongoPort;
- }
public String getMongoUserName() {
return mongoUserName;
@@ -109,6 +93,14 @@ public class MongoReplicatorConfig {
}
+ public String getReplicaSet() {
+ return replicaSet;
+ }
+
+ public void setReplicaSet(String replicaSet) {
+ this.replicaSet = replicaSet;
+ }
+
public void load(KeyValue props) {
properties2Object(props, this);
@@ -154,4 +146,11 @@ public class MongoReplicatorConfig {
}
}
}
+
+ public String getDataSouce() {
+ if (StringUtils.isBlank(replicaSet)) {
+ return mongoAddr;
+ }
+ return replicaSet + ":" + mongoAddr;
+ }
}
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index 40dfdd0..9176ab7 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -44,7 +44,7 @@ public class MongoSourceTask extends SourceTask {
buildFieleds(schema);
DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(event.getNamespace())
+ .queue(event.getNamespace().replace(".", "-"))
.entryType(event.getEntryType());
if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) {
@@ -71,9 +71,7 @@ public class MongoSourceTask extends SourceTask {
replicatorConfig = new MongoReplicatorConfig();
replicatorConfig.load(config);
mongoReplicator = new MongoReplicator(replicatorConfig);
- mongoSource = new StringBuilder()
- .append(replicatorConfig.getMongoAddr())
- .append(replicatorConfig.getMongoPort()).toString();
+ mongoSource = replicatorConfig.getDataSouce();
ByteBuffer position = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
mongoSource.getBytes()));
@@ -121,7 +119,7 @@ public class MongoSourceTask extends SourceTask {
schema.getFields().add(operation);
Field patch = new Field(5, Constants.PATCH, FieldType.STRING);
schema.getFields().add(patch);
- Field objectId = new Field(5, Constants.OBJECTID, FieldType.STRING);
+ Field objectId = new Field(6, Constants.OBJECTID, FieldType.STRING);
schema.getFields().add(objectId);
}
@@ -134,9 +132,9 @@ public class MongoSourceTask extends SourceTask {
jsonObject.put(Constants.INITSYNC, true);
break;
default:
- jsonObject.put(Constants.POSITION_TIMESTAMP, 0);
- jsonObject.put(Constants.POSITION_INC, 0);
- jsonObject.put(Constants.INITSYNC, true);
+ jsonObject.put(Constants.POSITION_TIMESTAMP, event.getTimestamp().getTime());
+ jsonObject.put(Constants.POSITION_INC, event.getTimestamp().getInc());
+ jsonObject.put(Constants.INITSYNC, false);
break;
}
return jsonObject;
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index 4b3a238..e12412b 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -27,7 +27,6 @@ public class InitSync {
private MongoClient mongoClient;
private Filter filter;
private int copyThreadCount;
- private Set<String> interestDataBases;
private Set<CollectionMeta> interestCollections;
private CountDownLatch countDownLatch;
private MongoReplicator mongoReplicator;
@@ -53,8 +52,7 @@ public class InitSync {
}
private void init() {
- interestDataBases = getInterestDataBase();
- interestCollections = getInterestCollection(interestDataBases);
+ interestCollections = getInterestCollection();
copyThreadCount = Math.min(interestCollections.size(), mongoReplicatorConfig.getCopyThread());
copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() {
@@ -68,15 +66,17 @@ public class InitSync {
countDownLatch = new CountDownLatch(interestCollections.size());
}
- private Set<CollectionMeta> getInterestCollection(Set<String> interestDataBases) {
+ private Set<CollectionMeta> getInterestCollection() {
Set<CollectionMeta> res = new HashSet<>();
- for (String interestDataBase : interestDataBases) {
- MongoIterable<String> collectionNames = mongoClient.getDatabase(interestDataBase).listCollectionNames();
- MongoCursor<String> iterator = collectionNames.iterator();
- while (iterator.hasNext()) {
- String collectionName = iterator.next();
- if (filter.filterCollectionName(collectionName)) {
- CollectionMeta collectionMeta = new CollectionMeta(interestDataBase, collectionName);
+ MongoIterable<String> databaseNames = mongoClient.listDatabaseNames();
+ MongoCursor<String> dbIterator = databaseNames.iterator();
+ while (dbIterator.hasNext()) {
+ String dataBaseName = dbIterator.next();
+ MongoCursor<String> collIterator = mongoClient.getDatabase(dataBaseName).listCollectionNames().iterator();
+ while (collIterator.hasNext()) {
+ String collectionName = collIterator.next();
+ CollectionMeta collectionMeta = new CollectionMeta(dataBaseName, collectionName);
+ if (filter.filter(collectionMeta)) {
res.add(collectionMeta);
}
}
@@ -86,19 +86,6 @@ public class InitSync {
}
- private Set<String> getInterestDataBase() {
- Set<String> res = new HashSet<>();
- MongoIterable<String> databaseNames = mongoClient.listDatabaseNames();
- MongoCursor<String> iterator = databaseNames.iterator();
- while (iterator.hasNext()) {
- String dataBaseName = iterator.next();
- if (filter.filterDatabaseName(dataBaseName)) {
- res.add(dataBaseName);
- }
- }
-
- return res;
- }
class CopyRunner implements Runnable {
@@ -132,6 +119,9 @@ public class InitSync {
event.setNamespace(collectionMeta.getNameSpace());
mongoReplicator.publishEvent(event);
}
+
+ } catch (Exception e) {
+ logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace());
} finally {
countDownLatch.countDown();
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
index 05dec54..4a62e41 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
@@ -1,74 +1,68 @@
package org.apache.connect.mongo.replicator;
-import org.apache.commons.lang3.ArrayUtils;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.initsync.CollectionMeta;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.function.Function;
public class Filter {
- private Function<String, Boolean> dbFilter;
- private Function<String, Boolean> collectionFilter;
- private Function<OperationType, Boolean> noopFilter;
+ private Function<CollectionMeta, Boolean> dbAndCollectionFilter;
+ private Map<String, List<String>> interestMap = new HashMap<>();
+ private Function<OperationType, Boolean> notNoopFilter;
public Filter(MongoReplicatorConfig mongoReplicatorConfig) {
- if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestDB())) {
- dbFilter = (dataBaseName) -> {
- if (StringUtils.isBlank(dataBaseName)) {
- return true;
- }
- String interestDB = mongoReplicatorConfig.getInterestDB();
- String[] db = StringUtils.split(interestDB, ",");
- if (ArrayUtils.contains(db, dataBaseName)) {
- return true;
- }
- return false;
- };
- } else {
- dbFilter = (dataBaseName) -> true;
- }
+ String interestDbAndCollection = mongoReplicatorConfig.getInterestDbAndCollection();
+ if (StringUtils.isNotBlank(interestDbAndCollection)) {
+ JSONObject jsonObject = JSONObject.parseObject(interestDbAndCollection);
+ for (String db : jsonObject.keySet()) {
+ List<String> collections = jsonObject.getObject(db, new TypeReference<List<String>>() {
+ });
+ interestMap.put(db, collections);
+ }
- if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestCollection())) {
- collectionFilter = (collectionName) -> {
- if (StringUtils.isBlank(collectionName)) {
- return true;
- }
- String interestCollection = mongoReplicatorConfig.getInterestCollection();
- String[] coll = StringUtils.split(interestCollection, ",");
- if (ArrayUtils.contains(coll, collectionName)) {
- return true;
- }
- return false;
- };
- } else {
- collectionFilter = (collectionName) -> true;
}
- noopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal();
- }
+ dbAndCollectionFilter = (collectionMeta) -> {
+ if (interestMap.size() == 0) {
+ return true;
+ }
+ List<String> collections = interestMap.get(collectionMeta.getDatabaseName());
+ if (collections == null || collections.size() == 0) {
+ return false;
+ }
+
+ if (collections.contains("*") || collections.contains(collectionMeta.getCollectionName())) {
+ return true;
+ }
- public boolean filterDatabaseName(String dataBaseName) {
- return dbFilter.apply(dataBaseName);
+ return false;
+ };
+
+ notNoopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal();
}
- public boolean filterCollectionName(String collectionName) {
- return collectionFilter.apply(collectionName);
+ public boolean filter(CollectionMeta collectionMeta) {
+ return dbAndCollectionFilter.apply(collectionMeta);
}
public boolean filterEvent(ReplicationEvent event) {
- return dbFilter.apply(event.getDatabaseName())
- && collectionFilter.apply(event.getCollectionName())
- && noopFilter.apply(event.getOperationType());
+ return dbAndCollectionFilter.apply(new CollectionMeta(event.getDatabaseName(), event.getCollectionName()))
+ && notNoopFilter.apply(event.getOperationType());
}
-
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
index a14ceee..0f8520b 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
@@ -54,7 +54,8 @@ public class MongoReplicator {
return;
}
- this.clientSettings = MongoClientSettings.builder().applicationName(APPLICATION_NAME)
+ this.clientSettings = MongoClientSettings.builder()
+ .applicationName(APPLICATION_NAME)
.applyConnectionString(connectionString)
.build();
this.mongoClient = MongoClients.create(clientSettings);
@@ -68,7 +69,6 @@ public class MongoReplicator {
private void buildConnectionString() {
- checkConfig();
StringBuilder sb = new StringBuilder();
sb.append("mongodb://");
if (StringUtils.isNotBlank(mongoReplicatorConfig.getMongoUserName())
@@ -80,19 +80,17 @@ public class MongoReplicator {
}
sb.append(mongoReplicatorConfig.getMongoAddr());
- sb.append(":");
- sb.append(mongoReplicatorConfig.getMongoPort());
-
+ sb.append("/");
+ if (StringUtils.isBlank(mongoReplicatorConfig.getReplicaSet())) {
+ sb.append("?");
+ sb.append("replicaSet=");
+ sb.append(mongoReplicatorConfig.getReplicaSet());
+ }
this.connectionString = new ConnectionString(sb.toString());
}
- private void checkConfig() {
- Validate.notBlank(mongoReplicatorConfig.getMongoAddr(), "mongo url is blank");
- Validate.isTrue(mongoReplicatorConfig.getMongoPort() > 0 && mongoReplicatorConfig.getMongoPort() < 65535, "mongo port should >0 and <65535");
- }
-
- private boolean isReplicaMongo() {
+ public boolean isReplicaMongo() {
MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
MongoIterable<String> collectionNames = local.listCollectionNames();
for (String collectionName : collectionNames) {
@@ -101,7 +99,7 @@ public class MongoReplicator {
}
}
this.shutdown();
- throw new IllegalStateException(String.format("url:%s, port:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getMongoPort()));
+ throw new IllegalStateException(String.format("url:%s, set:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getReplicaSet()));
}
public void shutdown() {
@@ -127,6 +125,7 @@ public class MongoReplicator {
}
+
public void pause() {
pause = true;
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
index 6b902e7..a18aa52 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
@@ -10,26 +10,20 @@ import static org.apache.connect.mongo.replicator.Constants.*;
public class DocumentConvertEvent {
+
public static ReplicationEvent convert(Document document) {
- ReplicationEvent event = null;
- try {
- OperationType operationType = OperationType.getOperationType(document.getString(OPERATIONTYPE));
- BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP);
+ OperationType operationType = OperationType.getOperationType(document.getString(OPERATIONTYPE));
+ BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP);
// Long t = document.getLong("t");
- Long h = document.getLong(HASH);
- Integer v = document.getInteger(VERSION);
- String nameSpace = document.getString(NAMESPACE);
+ Long h = document.getLong(HASH);
+ Integer v = document.getInteger(VERSION);
+ String nameSpace = document.getString(NAMESPACE);
// String uuid = document.getString("uuid");
// Date wall = document.getDate("wall");
- Document operation = document.get(OPERATION, Document.class);
- Document objectID = document.get(OBJECTID, Document.class);
- event = new ReplicationEvent(operationType, timestamp, v, h, nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), document);
- } catch (Exception e) {
- System.out.println(e);
- }
-
- return event;
+ Document operation = document.get(OPERATION, Document.class);
+ Document objectID = document.get(OBJECTID, Document.class);
+ return new ReplicationEvent(operationType, timestamp, v, h, nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), document);
}
}
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
index 24f36d9..2b14b13 100644
--- a/src/test/java/org/apache/connect/mongo/FilterTest.java
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -1,5 +1,7 @@
package org.apache.connect.mongo;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.connect.mongo.initsync.CollectionMeta;
import org.apache.connect.mongo.replicator.Filter;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
@@ -7,51 +9,55 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class FilterTest {
private MongoReplicatorConfig config;
-
+ private Map<String, List<String>> insterest;
@Before
public void init() {
config = new MongoReplicatorConfig();
-
+ insterest = new HashMap<>();
}
@Test
public void testSpecialDb() {
- config.setInterestDB("test,admin");
+ List<String> collections = new ArrayList<>();
+ collections.add("person");
+ insterest.put("test", collections);
+ config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
Filter filter = new Filter(config);
- Assert.assertTrue(filter.filterDatabaseName("test"));
- Assert.assertFalse(filter.filterDatabaseName("test01"));
+ Assert.assertTrue(filter.filter(new CollectionMeta("test", "person")));
+ Assert.assertFalse(filter.filter(new CollectionMeta("test", "person01")));
}
@Test
public void testBlankDb() {
Filter filter = new Filter(config);
- Assert.assertTrue(filter.filterDatabaseName("test"));
- Assert.assertTrue(filter.filterDatabaseName("test01"));
+ Assert.assertTrue(filter.filter(new CollectionMeta("test" ,"test")));
+ Assert.assertTrue(filter.filter(new CollectionMeta("test1" ,"test01")));
}
@Test
- public void testSpecialCollection() {
- config.setInterestCollection("test,admin");
+ public void testAsterisk() {
+ List<String> collections = new ArrayList<>();
+ collections.add("*");
+ insterest.put("test", collections);
+ config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
Filter filter = new Filter(config);
- Assert.assertTrue(filter.filterCollectionName("test"));
- Assert.assertFalse(filter.filterCollectionName("test01"));
+ Assert.assertTrue(filter.filter(new CollectionMeta("test", "testsad")));
+ Assert.assertTrue(filter.filter(new CollectionMeta("test", "tests032")));
}
- @Test
- public void testBlankCollection() {
- Filter filter = new Filter(config);
- Assert.assertTrue(filter.filterCollectionName("test"));
- Assert.assertTrue(filter.filterCollectionName("test01"));
- }
-
@Test
public void testFilterEvent() {
diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java
new file mode 100644
index 0000000..849c00c
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -0,0 +1,105 @@
+package org.apache.connect.mongo;
+
+import com.alibaba.fastjson.JSONObject;
+import com.mongodb.ConnectionString;
+import com.mongodb.CursorType;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.*;
+import com.mongodb.client.model.Filters;
+import io.openmessaging.connector.api.data.EntryType;
+import org.apache.connect.mongo.initsync.InitSync;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.Filter;
+import org.apache.connect.mongo.replicator.MongoReplicator;
+import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MongoTest {
+
+ private MongoClient mongoClient;
+
+ @Before
+ public void before() {
+ MongoClientSettings.Builder builder = MongoClientSettings.builder();
+ builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077"));
+ mongoClient = MongoClients.create(builder.build());
+ }
+
+
+ @Test
+ public void testConvertEvent() {
+ Document oplog = new Document();
+ BsonTimestamp timestamp = new BsonTimestamp(1565074665, 10);
+ oplog.put(Constants.TIMESTAMP, timestamp);
+ oplog.put(Constants.NAMESPACE, "test.person");
+ oplog.put(Constants.HASH, 11111L);
+ oplog.put(Constants.OPERATIONTYPE, "i");
+ Document document = new Document();
+ document.put("test", "test");
+ oplog.put(Constants.OPERATION, document);
+ ReplicationEvent event = DocumentConvertEvent.convert(oplog);
+ Assert.assertEquals(timestamp, event.getTimestamp());
+ Assert.assertEquals("test.person", event.getNamespace());
+ Assert.assertTrue(11111L == event.getH());
+ Assert.assertEquals(OperationType.INSERT, event.getOperationType());
+ Assert.assertEquals(EntryType.CREATE, event.getEntryType());
+ Assert.assertEquals(document, event.getEventData().get());
+
+
+ }
+
+
+ @Test
+ public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
+ MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person");
+ collection.deleteMany(new Document());
+ int count = 100;
+ List<Document> documents = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ Document document = new Document();
+ document.put("name", "test" + i);
+ document.put("age", i);
+ document.put("sex", i % 2 == 0 ? "boy" : "girl");
+ collection.insertOne(document);
+ documents.add(document);
+ }
+ MongoReplicatorConfig config = new MongoReplicatorConfig();
+ Map<String, List<String>> insterest = new HashMap<>();
+ List<String> collections = new ArrayList<>();
+ collections.add("*");
+ insterest.put("test", collections);
+ config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
+ MongoReplicator mongoReplicator = new MongoReplicator(config);
+ Field running = MongoReplicator.class.getDeclaredField("running");
+ running.setAccessible(true);
+ running.set(mongoReplicator, new AtomicBoolean(true));
+ InitSync initSync = new InitSync(config, mongoClient, new Filter(config), mongoReplicator);
+ initSync.start();
+ BlockingQueue<ReplicationEvent> queue = mongoReplicator.getQueue();
+ while (count > 0) {
+ count--;
+ ReplicationEvent event = queue.poll(100, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(event.getOperationType().equals(OperationType.CREATED));
+ Assert.assertNotNull(event.getDocument());
+ Assert.assertTrue(documents.contains(event.getDocument()));
+ }
+
+
+
+ }
+}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicatorTest.java b/src/test/java/org/apache/connect/mongo/ReplicatorTest.java
deleted file mode 100644
index b0319f1..0000000
--- a/src/test/java/org/apache/connect/mongo/ReplicatorTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.connect.mongo;
-
-import org.apache.connect.mongo.replicator.MongoReplicator;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ReplicatorTest {
-
- private MongoReplicatorConfig config;
-
- @Before
- public void before() {
- config = new MongoReplicatorConfig();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testNoPort() {
- config.setMongoAddr("127.0.0.1");
- MongoReplicator mongoReplicator = new MongoReplicator(config);
- mongoReplicator.start();
- }
-
-
- @Test
- public void testNoPort1() {
- config.setMongoAddr("127.0.0.1");
- config.setMongoPort(27012);
- MongoReplicator mongoReplicator = new MongoReplicator(config);
- mongoReplicator.start();
- }
-
-
-}