You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:30 UTC
[rocketmq-connect] 07/13: support multiple mongo replicaset
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit f95bf2257d3bb9c57cfb2194876e57af45c5023e
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Tue Aug 13 15:59:38 2019 +0800
support multiple mongo replicaset
---
...ReplicatorConfig.java => SourceTaskConfig.java} | 35 ++---
.../mongo/connector/MongoSourceConnector.java | 4 +-
.../connect/mongo/connector/MongoSourceTask.java | 146 +++++++-------------
.../mongo/connector/builder/MongoDataEntry.java | 113 +++++++++++++++
.../apache/connect/mongo/initsync/InitSync.java | 61 ++++----
.../apache/connect/mongo/replicator/Constants.java | 6 -
.../apache/connect/mongo/replicator/Filter.java | 8 +-
.../connect/mongo/replicator/MongoReplicator.java | 153 ---------------------
.../connect/mongo/replicator/ReplicaSet.java | 101 ++++++++++++++
.../connect/mongo/replicator/ReplicaSetConfig.java | 139 +++++++++++++++++++
.../connect/mongo/replicator/ReplicaSets.java | 70 ++++++++++
.../mongo/replicator/ReplicaSetsContext.java | 129 +++++++++++++++++
.../connect/mongo/replicator/ReplicatorTask.java | 45 +++---
.../replicator/event/DocumentConvertEvent.java | 29 ----
.../mongo/replicator/event/EventConverter.java | 29 ++++
.../mongo/replicator/event/ReplicationEvent.java | 18 ++-
.../java/org/apache/connect/mongo/FilterTest.java | 28 ++--
.../connect/mongo/MongoSourceConnectorTest.java | 69 +++++++++-
.../java/org/apache/connect/mongo/MongoTest.java | 78 +++++++----
.../apache/connect/mongo/ReplicaContextTest.java | 34 +++++
.../org/apache/connect/mongo/ReplicaSetsTest.java | 65 +++++++++
21 files changed, 946 insertions(+), 414 deletions(-)
diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
similarity index 83%
rename from src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
rename to src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
index 9097640..b79b2e9 100644
--- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
+++ b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
@@ -1,37 +1,37 @@
package org.apache.connect.mongo;
import io.openmessaging.KeyValue;
-import org.apache.commons.lang3.StringUtils;
import org.bson.BsonTimestamp;
import java.lang.reflect.Method;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-public class MongoReplicatorConfig {
+public class SourceTaskConfig {
private String replicaSet;
private String mongoAddr;
private String mongoUserName;
private String mongoPassWord;
private String interestDbAndCollection;
- private int positionTimeStamp;
- private int positionInc;
- private boolean dataSync;
+ private String positionTimeStamp;
+ private String positionInc;
+ private String dataSync;
private int copyThread = Runtime.getRuntime().availableProcessors();
- public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+ public static final Set<String> REQUEST_CONFIG = Collections.unmodifiableSet(new HashSet<String>() {
{
add("mongoAddr");
}
- };
+ });
- public int getPositionInc() {
+ public String getPositionInc() {
return positionInc;
}
- public void setPositionInc(int positionInc) {
+ public void setPositionInc(String positionInc) {
this.positionInc = positionInc;
}
@@ -43,11 +43,11 @@ public class MongoReplicatorConfig {
this.copyThread = copyThread;
}
- public int getPositionTimeStamp() {
+ public String getPositionTimeStamp() {
return positionTimeStamp;
}
- public void setPositionTimeStamp(int positionTimeStamp) {
+ public void setPositionTimeStamp(String positionTimeStamp) {
this.positionTimeStamp = positionTimeStamp;
}
@@ -85,11 +85,11 @@ public class MongoReplicatorConfig {
}
- public boolean getDataSync() {
+ public String getDataSync() {
return dataSync;
}
- public void setDataSync(boolean dataSync) {
+ public void setDataSync(String dataSync) {
this.dataSync = dataSync;
}
@@ -148,16 +148,9 @@ public class MongoReplicatorConfig {
}
}
- public String getDataSouce() {
- if (StringUtils.isBlank(replicaSet)) {
- return mongoAddr;
- }
- return replicaSet + ":" + mongoAddr;
- }
-
public BsonTimestamp getPosition() {
- return new BsonTimestamp(positionTimeStamp, positionInc);
+ return new BsonTimestamp(Integer.valueOf(positionTimeStamp), Integer.valueOf(positionInc));
}
}
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
index 9e659c9..2b28ea2 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
@@ -3,7 +3,7 @@ package org.apache.connect.mongo.connector;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.source.SourceConnector;
-import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.SourceTaskConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,7 +18,7 @@ public class MongoSourceConnector extends SourceConnector {
@Override
public String verifyAndSetConfig(KeyValue config) {
- for (String requestKey : MongoReplicatorConfig.REQUEST_CONFIG) {
+ for (String requestKey : SourceTaskConfig.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
return "Request config key: " + requestKey;
}
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index 7272878..407608a 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -2,91 +2,71 @@ package org.apache.connect.mongo.connector;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.connector.api.source.SourceTask;
-import org.apache.connect.mongo.MongoReplicatorConfig;
-import org.apache.connect.mongo.replicator.Constants;
-import org.apache.connect.mongo.replicator.MongoReplicator;
-import org.apache.connect.mongo.replicator.event.OperationType;
-import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.SourceTaskConfig;
+import org.apache.connect.mongo.replicator.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
+import java.util.regex.Pattern;
public class MongoSourceTask extends SourceTask {
private Logger logger = LoggerFactory.getLogger(this.getClass());
- private MongoReplicator mongoReplicator;
+ private SourceTaskConfig sourceTaskConfig;
- private MongoReplicatorConfig replicatorConfig;
+ private ReplicaSets replicaSets;
- private String mongoSource;
+ private ReplicaSetsContext replicaSetsContext;
+
+ private Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
@Override
public Collection<SourceDataEntry> poll() {
- List<SourceDataEntry> res = new ArrayList<>();
- ReplicationEvent event = mongoReplicator.getQueue().poll();
- if (event == null) {
- return new ArrayList<>();
- }
- JSONObject position = position(event);
- Schema schema = new Schema();
- schema.setDataSource(event.getDatabaseName());
- schema.setName(event.getCollectionName());
- schema.setFields(new ArrayList<>());
- buildFieleds(schema);
- DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
- dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(event.getNamespace().replace(".", "-").replace("$", ""))
- .entryType(event.getEntryType());
-
- if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) {
- dataEntryBuilder.putFiled(Constants.CREATED, event.getDocument().toJson());
- dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace());
- } else {
- dataEntryBuilder.putFiled(Constants.OPERATIONTYPE, event.getOperationType().name());
- dataEntryBuilder.putFiled(Constants.TIMESTAMP, event.getTimestamp().getValue());
- dataEntryBuilder.putFiled(Constants.VERSION, event.getV());
- dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace());
- dataEntryBuilder.putFiled(Constants.PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : "");
- dataEntryBuilder.putFiled(Constants.OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
- }
- SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap(mongoSource.getBytes(StandardCharsets.UTF_8)),
- ByteBuffer.wrap(position.toJSONString().getBytes(StandardCharsets.UTF_8)));
- res.add(sourceDataEntry);
- return res;
+ return replicaSetsContext.poll();
}
@Override
public void start(KeyValue config) {
try {
- replicatorConfig = new MongoReplicatorConfig();
- replicatorConfig.load(config);
- mongoReplicator = new MongoReplicator(replicatorConfig);
- mongoSource = replicatorConfig.getDataSouce();
- ByteBuffer position = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
- mongoSource.getBytes()));
-
- if (position != null && position.array().length > 0) {
- String positionJson = new String(position.array(), StandardCharsets.UTF_8);
- JSONObject jsonObject = JSONObject.parseObject(positionJson);
- replicatorConfig.setPositionTimeStamp(jsonObject.getIntValue("timeStamp"));
- replicatorConfig.setPositionInc(jsonObject.getIntValue("inc"));
- replicatorConfig.setDataSync(jsonObject.getBooleanValue(Constants.INITSYNC));
- } else {
- replicatorConfig.setDataSync(true);
- }
- mongoReplicator.start();
- }catch (Throwable throwable) {
- logger.info("task start error", throwable);
+ sourceTaskConfig = new SourceTaskConfig();
+ sourceTaskConfig.load(config);
+ replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
+ replicaSets = ReplicaSets.create(sourceTaskConfig.getMongoAddr());
+ replicaSets.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> {
+ ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
+ replicaSetName.getBytes()));
+ if (byteBuffer != null && byteBuffer.array().length > 0) {
+ String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8);
+ ReplicaSetConfig.Position position = JSONObject.parseObject(positionJson, ReplicaSetConfig.Position.class);
+ replicaSetConfig.setPosition(position);
+ } else {
+ ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
+ position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp() != null
+ && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches()
+ ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0);
+ position.setInc(sourceTaskConfig.getPositionInc() != null
+ && pattern.matcher(sourceTaskConfig.getPositionInc()).matches()
+ ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0);
+ position.setInitSync(StringUtils.equals(sourceTaskConfig.getDataSync(), Constants.INITSYNC) ? true : false);
+ replicaSetConfig.setPosition(position);
+ }
+
+ ReplicaSet replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
+ replicaSetsContext.addReplicaSet(replicaSet);
+ replicaSet.start();
+ });
+
+
+ } catch (Throwable throwable) {
+ logger.error("task start error", throwable);
stop();
}
}
@@ -94,51 +74,19 @@ public class MongoSourceTask extends SourceTask {
@Override
public void stop() {
logger.info("shut down.....");
- mongoReplicator.shutdown();
+ replicaSetsContext.shutdown();
}
@Override
public void pause() {
- mongoReplicator.pause();
+ logger.info("pause replica task...");
+ replicaSetsContext.pause();
}
@Override
public void resume() {
- mongoReplicator.resume();
- }
-
- private void buildFieleds(Schema schema) {
- Field op = new Field(0, Constants.OPERATIONTYPE, FieldType.STRING);
- schema.getFields().add(op);
- Field time = new Field(1, Constants.TIMESTAMP, FieldType.INT64);
- schema.getFields().add(time);
- Field v = new Field(2, Constants.VERSION, FieldType.INT32);
- schema.getFields().add(v);
- Field namespace = new Field(3, Constants.NAMESPACE, FieldType.STRING);
- schema.getFields().add(namespace);
- Field operation = new Field(4, Constants.CREATED, FieldType.STRING);
- schema.getFields().add(operation);
- Field patch = new Field(5, Constants.PATCH, FieldType.STRING);
- schema.getFields().add(patch);
- Field objectId = new Field(6, Constants.OBJECTID, FieldType.STRING);
- schema.getFields().add(objectId);
+ logger.info("resume replica task...");
+ replicaSetsContext.resume();
}
- private JSONObject position(ReplicationEvent event) {
- JSONObject jsonObject = new JSONObject();
- switch (event.getOperationType()) {
- case CREATED:
- jsonObject.put(Constants.POSITION_TIMESTAMP, 0);
- jsonObject.put(Constants.POSITION_INC, 0);
- jsonObject.put(Constants.INITSYNC, true);
- break;
- default:
- jsonObject.put(Constants.POSITION_TIMESTAMP, event.getTimestamp().getTime());
- jsonObject.put(Constants.POSITION_INC, event.getTimestamp().getInc());
- jsonObject.put(Constants.INITSYNC, false);
- break;
- }
- return jsonObject;
-
- }
}
diff --git a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
new file mode 100644
index 0000000..7c0db7d
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
@@ -0,0 +1,113 @@
+package org.apache.connect.mongo.connector.builder;
+
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.connector.api.data.*;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.bson.BsonTimestamp;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+import static org.apache.connect.mongo.replicator.Constants.*;
+
+public class MongoDataEntry {
+
+ private static String SCHEMA_CREATED_NAME = "mongo_created";
+ private static String SCHEMA_OPLOG_NAME = "mongo_oplog";
+
+ public static SourceDataEntry createSouceDataEntry(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
+
+ DataEntryBuilder dataEntryBuilder;
+
+ if (event.getOperationType().equals(OperationType.CREATED)) {
+ Schema schema = createdSchema(replicaSetConfig.getReplicaSetName());
+ dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.timestamp(System.currentTimeMillis())
+ .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
+ .entryType(event.getEntryType());
+
+ dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson());
+ dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
+
+ } else {
+ Schema schema = oplogSchema(replicaSetConfig.getReplicaSetName());
+ dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.timestamp(System.currentTimeMillis())
+ .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
+ .entryType(event.getEntryType());
+ dataEntryBuilder.putFiled(OPERATIONTYPE, event.getOperationType().name());
+ dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue());
+ dataEntryBuilder.putFiled(VERSION, event.getV());
+ dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
+ dataEntryBuilder.putFiled(PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : "");
+ dataEntryBuilder.putFiled(OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
+ }
+
+
+ String position = createPosition(event, replicaSetConfig);
+ SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+ ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8)));
+ return sourceDataEntry;
+ }
+
+
+ private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
+ ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
+ BsonTimestamp timestamp = event.getTimestamp();
+ position.setInc(timestamp != null ? timestamp.getInc() : 0);
+ position.setTimeStamp(timestamp != null ? timestamp.getTime() : 0);
+ position.setInitSync(event.getOperationType().equals(OperationType.CREATED) ? true : false);
+ return JSONObject.toJSONString(position);
+
+ }
+
+ private static Schema createdSchema(String dataSourceName) {
+ Schema schema = new Schema();
+ schema.setDataSource(dataSourceName);
+ schema.setName(SCHEMA_CREATED_NAME);
+ schema.setFields(new ArrayList<>());
+ createdField(schema);
+ return schema;
+ }
+
+
+ private static Schema oplogSchema(String dataSourceName) {
+ Schema schema = new Schema();
+ schema.setDataSource(dataSourceName);
+ schema.setName(SCHEMA_OPLOG_NAME);
+ oplogField(schema);
+ return schema;
+ }
+
+
+ private static void createdField(Schema schema) {
+ Field namespace = new Field(0, NAMESPACE, FieldType.STRING);
+ schema.getFields().add(namespace);
+ Field operation = new Field(1, Constants.CREATED, FieldType.STRING);
+ schema.getFields().add(operation);
+ }
+
+ private static void oplogField(Schema schema) {
+ schema.setFields(new ArrayList<>());
+ Field op = new Field(0, OPERATIONTYPE, FieldType.STRING);
+ schema.getFields().add(op);
+ Field time = new Field(1, TIMESTAMP, FieldType.INT64);
+ schema.getFields().add(time);
+ Field v = new Field(2, VERSION, FieldType.INT32);
+ schema.getFields().add(v);
+ Field namespace = new Field(3, NAMESPACE, FieldType.STRING);
+ schema.getFields().add(namespace);
+ Field patch = new Field(4, PATCH, FieldType.STRING);
+ schema.getFields().add(patch);
+ Field objectId = new Field(5, OBJECTID, FieldType.STRING);
+ schema.getFields().add(objectId);
+ }
+
+
+}
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index e12412b..d92e968 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -3,45 +3,48 @@ package org.apache.connect.mongo.initsync;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoIterable;
-import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.ReplicaSet;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.apache.connect.mongo.replicator.event.EventConverter;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-import org.apache.connect.mongo.replicator.Filter;
-import org.apache.connect.mongo.replicator.MongoReplicator;
-import org.apache.connect.mongo.MongoReplicatorConfig;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class InitSync {
private Logger logger = LoggerFactory.getLogger(this.getClass());
- private MongoReplicatorConfig mongoReplicatorConfig;
+ private ReplicaSetConfig replicaSetConfig;
private ExecutorService copyExecutor;
private MongoClient mongoClient;
- private Filter filter;
+ private ReplicaSetsContext context;
private int copyThreadCount;
private Set<CollectionMeta> interestCollections;
private CountDownLatch countDownLatch;
- private MongoReplicator mongoReplicator;
+ private ReplicaSet replicaSet;
- public InitSync(MongoReplicatorConfig mongoReplicatorConfig, MongoClient mongoClient, Filter filter, MongoReplicator mongoReplicator) {
- this.mongoReplicatorConfig = mongoReplicatorConfig;
+ public InitSync(ReplicaSetConfig replicaSetConfig, MongoClient mongoClient, ReplicaSetsContext context, ReplicaSet replicaSet) {
+ this.replicaSetConfig = replicaSetConfig;
this.mongoClient = mongoClient;
- this.filter = filter;
- this.mongoReplicator = mongoReplicator;
+ this.context = context;
+ this.replicaSet = replicaSet;
init();
}
public void start() {
for (CollectionMeta collectionMeta : interestCollections) {
- copyExecutor.submit(new CopyRunner(mongoClient, countDownLatch, collectionMeta, mongoReplicator));
+ copyExecutor.submit(new CopyRunner(mongoClient, countDownLatch, collectionMeta, replicaSet));
}
try {
countDownLatch.await();
@@ -53,7 +56,7 @@ public class InitSync {
private void init() {
interestCollections = getInterestCollection();
- copyThreadCount = Math.min(interestCollections.size(), mongoReplicatorConfig.getCopyThread());
+ copyThreadCount = Math.min(interestCollections.size(), context.getCopyThread());
copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() {
AtomicInteger threads = new AtomicInteger();
@@ -76,7 +79,7 @@ public class InitSync {
while (collIterator.hasNext()) {
String collectionName = collIterator.next();
CollectionMeta collectionMeta = new CollectionMeta(dataBaseName, collectionName);
- if (filter.filter(collectionMeta)) {
+ if (context.filterMeta(collectionMeta)) {
res.add(collectionMeta);
}
}
@@ -92,40 +95,46 @@ public class InitSync {
private MongoClient mongoClient;
private CountDownLatch countDownLatch;
private CollectionMeta collectionMeta;
- private MongoReplicator mongoReplicator;
+ private ReplicaSet replicaSet;
- public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, MongoReplicator mongoReplicator) {
+ public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, ReplicaSet replicaSet) {
this.mongoClient = mongoClient;
this.countDownLatch = countDownLatch;
this.collectionMeta = collectionMeta;
- this.mongoReplicator = mongoReplicator;
+ this.replicaSet = replicaSet;
}
@Override
public void run() {
-
+ logger.info("start copy database:{}, collection:{}", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName());
+ int count = 0;
try {
-
MongoCursor<Document> mongoCursor = mongoClient.getDatabase(collectionMeta.getDatabaseName())
.getCollection(collectionMeta.getCollectionName())
.find()
.batchSize(200)
.iterator();
-
- while (mongoReplicator.isRuning() && mongoCursor.hasNext()) {
+ while (replicaSet.isRuning() && mongoCursor.hasNext()) {
+ if (context.initSyncAbort()) {
+ logger.info("init sync database:{}, collection:{} abort, has copy:{} document", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count);
+ return;
+ }
+ count++;
Document document = mongoCursor.next();
- ReplicationEvent event = DocumentConvertEvent.convert(document);
+ ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
event.setOperationType(OperationType.CREATED);
event.setNamespace(collectionMeta.getNameSpace());
- mongoReplicator.publishEvent(event);
+ context.publishEvent(event, replicaSetConfig);
}
} catch (Exception e) {
- logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace());
+ context.initSyncError();
+ logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace(), e);
} finally {
countDownLatch.countDown();
+ replicaSet.shutdown();
}
- logger.info("database:{}, collection:{}, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName());
+ logger.info("database:{}, collection:{}, copy {} documents, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count);
}
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
index 668fd91..1a91a57 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Constants.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
@@ -3,8 +3,6 @@ package org.apache.connect.mongo.replicator;
public class Constants {
- public static final String APPLICATION_NAME = "java-mongo-replicator";
-
public static final String MONGO_LOCAL_DATABASE = "local";
public static final String MONGO_OPLOG_RS = "oplog.rs";
@@ -23,11 +21,7 @@ public class Constants {
public static final String CREATED = "created";
public static final String PATCH = "patch";
- public static final String INITIAL = "initial";
-
- public static final String POSITION_TIMESTAMP = "timeStamp";
- public static final String POSITION_INC = "inc";
public static final String INITSYNC = "initSync";
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
index 4a62e41..3e431a3 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
@@ -4,7 +4,7 @@ package org.apache.connect.mongo.replicator;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import org.apache.commons.lang3.StringUtils;
-import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.SourceTaskConfig;
import org.apache.connect.mongo.initsync.CollectionMeta;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
@@ -21,9 +21,9 @@ public class Filter {
private Function<OperationType, Boolean> notNoopFilter;
- public Filter(MongoReplicatorConfig mongoReplicatorConfig) {
+ public Filter(SourceTaskConfig sourceTaskConfig) {
- String interestDbAndCollection = mongoReplicatorConfig.getInterestDbAndCollection();
+ String interestDbAndCollection = sourceTaskConfig.getInterestDbAndCollection();
if (StringUtils.isNotBlank(interestDbAndCollection)) {
JSONObject jsonObject = JSONObject.parseObject(interestDbAndCollection);
@@ -57,7 +57,7 @@ public class Filter {
}
- public boolean filter(CollectionMeta collectionMeta) {
+ public boolean filterMeta(CollectionMeta collectionMeta) {
return dbAndCollectionFilter.apply(collectionMeta);
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
deleted file mode 100644
index 60b8d3d..0000000
--- a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package org.apache.connect.mongo.replicator;
-
-import com.mongodb.ConnectionString;
-import com.mongodb.MongoClientSettings;
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.MongoIterable;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.connect.mongo.MongoReplicatorConfig;
-import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.connect.mongo.replicator.Constants.*;
-
-
-public class MongoReplicator {
-
- private Logger logger = LoggerFactory.getLogger(this.getClass());
-
- private AtomicBoolean running = new AtomicBoolean();
-
- private MongoReplicatorConfig mongoReplicatorConfig;
-
- private MongoClientSettings clientSettings;
-
- private ConnectionString connectionString;
-
- private MongoClient mongoClient;
-
- private BlockingQueue<ReplicationEvent> queue = new LinkedBlockingQueue<>();
-
- private Filter filter;
-
- private ExecutorService executorService;
-
- private volatile boolean pause = false;
-
- public MongoReplicator(MongoReplicatorConfig mongoReplicatorConfig) {
- this.mongoReplicatorConfig = mongoReplicatorConfig;
- this.filter = new Filter(mongoReplicatorConfig);
- this.executorService = Executors.newSingleThreadExecutor((r) ->new Thread(r, "real_time_replica_thread"));
-
- buildConnectionString();
- }
-
- public void start() {
-
- try {
- if (!running.compareAndSet(false, true)) {
- logger.info("the java mongo replica already start");
- return;
- }
-
- this.clientSettings = MongoClientSettings.builder()
- .applicationName(APPLICATION_NAME)
- .applyConnectionString(connectionString)
- .build();
- this.mongoClient = MongoClients.create(clientSettings);
- this.isReplicaMongo();
- executorService.submit(new ReplicatorTask(this, mongoClient, mongoReplicatorConfig, filter));
- }catch (Exception e) {
- logger.info("start replicator error", e);
- shutdown();
- }
- }
-
-
- private void buildConnectionString() {
- StringBuilder sb = new StringBuilder();
- sb.append("mongodb://");
- if (StringUtils.isNotBlank(mongoReplicatorConfig.getMongoUserName())
- && StringUtils.isNotBlank(mongoReplicatorConfig.getMongoPassWord())) {
- sb.append(mongoReplicatorConfig.getMongoUserName());
- sb.append(":");
- sb.append(mongoReplicatorConfig.getMongoPassWord());
- sb.append("@");
-
- }
- sb.append(mongoReplicatorConfig.getMongoAddr());
- sb.append("/");
- if (StringUtils.isNotBlank(mongoReplicatorConfig.getReplicaSet())) {
- sb.append("?");
- sb.append("replicaSet=");
- sb.append(mongoReplicatorConfig.getReplicaSet());
- }
- this.connectionString = new ConnectionString(sb.toString());
- }
-
-
- public boolean isReplicaMongo() {
- MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
- MongoIterable<String> collectionNames = local.listCollectionNames();
- for (String collectionName : collectionNames) {
- if (MONGO_OPLOG_RS.equals(collectionName)) {
- return true;
- }
- }
- this.shutdown();
- throw new IllegalStateException(String.format("url:%s, set:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getReplicaSet()));
- }
-
- public void shutdown() {
- if (running.compareAndSet(true, false)) {
- if (!this.executorService.isShutdown()) {
- executorService.shutdown();
- }
- if (this.mongoClient != null) {
- this.mongoClient.close();
- }
- }
-
- }
-
- public void publishEvent(ReplicationEvent replicationEvent) {
- while (true) {
- try {
- queue.put(replicationEvent);
- break;
- } catch (Exception e) {
- }
- }
- }
-
-
-
- public void pause() {
- pause = true;
- }
-
- public void resume() {
- pause = false;
- }
-
- public boolean isPause() {
- return pause;
- }
-
- public boolean isRuning() {
- return running.get();
- }
-
- public BlockingQueue<ReplicationEvent> getQueue() {
- return queue;
- }
-}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
new file mode 100644
index 0000000..b141c2b
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
@@ -0,0 +1,101 @@
+package org.apache.connect.mongo.replicator;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.connect.mongo.replicator.Constants.MONGO_LOCAL_DATABASE;
+import static org.apache.connect.mongo.replicator.Constants.MONGO_OPLOG_RS;
+
+
+public class ReplicaSet {
+
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private AtomicBoolean running = new AtomicBoolean();
+
+ private ReplicaSetConfig replicaSetConfig;
+
+
+ private ReplicaSetsContext replicaSetsContext;
+
+ private MongoClient mongoClient;
+
+ private ExecutorService executorService;
+
+ private volatile boolean pause = false;
+
+ public ReplicaSet(ReplicaSetConfig replicaSetConfig, ReplicaSetsContext replicaSetsContext) {
+ this.replicaSetConfig = replicaSetConfig;
+ this.replicaSetsContext = replicaSetsContext;
+ this.executorService = Executors.newSingleThreadExecutor((r) -> new Thread(r, "real_time_replica_" + replicaSetConfig.getReplicaSetName() + "thread"));
+
+ }
+
+ public void start() {
+
+ try {
+ if (!running.compareAndSet(false, true)) {
+ logger.info("the java mongo replica already start");
+ return;
+ }
+ this.mongoClient = replicaSetsContext.createMongoClient(replicaSetConfig);
+ this.isReplicaMongo();
+ executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext));
+ } catch (Exception e) {
+ logger.error("start replicator:{} error", replicaSetConfig, e);
+ shutdown();
+ }
+ }
+
+
+ public boolean isReplicaMongo() {
+ MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
+ MongoIterable<String> collectionNames = local.listCollectionNames();
+ MongoCursor<String> iterator = collectionNames.iterator();
+ while (iterator.hasNext()) {
+ if (StringUtils.equals(MONGO_OPLOG_RS, iterator.next())) {
+ return true;
+ }
+ }
+ this.shutdown();
+ throw new IllegalStateException(String.format("url:%s, is not replica", replicaSetConfig.getHost()));
+ }
+
+ public void shutdown() {
+ if (running.compareAndSet(true, false)) {
+ if (!this.executorService.isShutdown()) {
+ executorService.shutdown();
+ }
+ if (this.mongoClient != null) {
+ this.mongoClient.close();
+ }
+ }
+
+ }
+
+
+ public void pause() {
+ pause = true;
+ }
+
+ public void resume() {
+ pause = false;
+ }
+
+ public boolean isPause() {
+ return pause;
+ }
+
+ public boolean isRuning() {
+ return running.get();
+ }
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
new file mode 100644
index 0000000..4b8d148
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
@@ -0,0 +1,139 @@
+package org.apache.connect.mongo.replicator;
+
+import org.bson.BsonTimestamp;
+
+import java.util.Objects;
+
+public class ReplicaSetConfig {
+
+
+ private String shardName;
+ private String replicaSetName;
+ private String host;
+ private Position position;
+
+
+ public Position getPosition() {
+ return position;
+ }
+
+ public void setPosition(Position position) {
+ this.position = position;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public void setShardName(String shardName) {
+ this.shardName = shardName;
+ }
+
+ public String getReplicaSetName() {
+ return replicaSetName;
+ }
+
+ public void setReplicaSetName(String replicaSetName) {
+ this.replicaSetName = replicaSetName;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public ReplicaSetConfig(String shardName, String replicaSetName, String host) {
+ this.shardName = shardName;
+ this.replicaSetName = replicaSetName;
+ this.host = host;
+ }
+
+ public Position emptyPosition() {
+ return new Position(0, 0, true);
+ }
+
+
+ public class Position {
+ private int timeStamp;
+ private int inc;
+ private boolean initSync;
+
+
+ public int getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(int timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public int getInc() {
+ return inc;
+ }
+
+ public void setInc(int inc) {
+ this.inc = inc;
+ }
+
+ public boolean isInitSync() {
+ return initSync;
+ }
+
+ public void setInitSync(boolean initSync) {
+ this.initSync = initSync;
+ }
+
+
+ public Position(int timeStamp, int inc, boolean initSync) {
+ this.timeStamp = timeStamp;
+ this.inc = inc;
+ this.initSync = initSync;
+ }
+
+ public boolean isValid() {
+ return timeStamp > 0;
+ }
+
+ public BsonTimestamp converBsonTimeStamp() {
+ return new BsonTimestamp(timeStamp, inc);
+ }
+
+ @Override
+ public String toString() {
+ return "Position{" +
+ "timeStamp=" + timeStamp +
+ ", inc=" + inc +
+ ", initSync=" + initSync +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Position position = (Position) o;
+ return timeStamp == position.timeStamp &&
+ inc == position.inc &&
+ initSync == position.initSync;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timeStamp, inc, initSync);
+ }
+ }
+
+
+ @Override
+ public String toString() {
+ return "ReplicaSetConfig{" +
+ "shardName='" + shardName + '\'' +
+ ", replicaSetName='" + replicaSetName + '\'' +
+ ", host='" + host + '\'' +
+ ", position=" + position +
+ '}';
+ }
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
new file mode 100644
index 0000000..9184b90
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
@@ -0,0 +1,70 @@
+package org.apache.connect.mongo.replicator;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ReplicaSets {
+
+ private static final Pattern HOST_PATTERN = Pattern.compile("((([^=]+)[=])?(([^/]+)\\/))?(.+)");
+
+
+ private final Map<String, ReplicaSetConfig> replicaConfigByName = new HashMap<>();
+
+
+ public ReplicaSets(Set<ReplicaSetConfig> replicaSetConfigs) {
+ replicaSetConfigs.forEach(replicaSetConfig -> {
+ if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
+ replicaConfigByName.put(replicaSetConfig.getReplicaSetName(), replicaSetConfig);
+ }
+ });
+
+ validate();
+ }
+
+ public static ReplicaSets create(String hosts) {
+ Set<ReplicaSetConfig> replicaSetConfigs = new HashSet<>();
+ if (hosts != null) {
+ for (String replicaSetStr : StringUtils.split(hosts.trim(), ";")) {
+ if (StringUtils.isNotBlank(replicaSetStr)) {
+ ReplicaSetConfig replicaSetConfig = parseReplicaSetStr(replicaSetStr);
+ if (replicaSetConfig != null) {
+ replicaSetConfigs.add(replicaSetConfig);
+ }
+ }
+ }
+ }
+ return new ReplicaSets(replicaSetConfigs);
+ }
+
+
+ private static ReplicaSetConfig parseReplicaSetStr(String hosts) {
+ if (hosts != null) {
+ Matcher matcher = HOST_PATTERN.matcher(hosts);
+ if (matcher.matches()) {
+ String shard = matcher.group(3);
+ String replicaSetName = matcher.group(5);
+ String host = matcher.group(6);
+ if (host != null && host.trim().length() != 0) {
+ return new ReplicaSetConfig(shard, replicaSetName, host);
+ }
+ }
+ }
+ return null;
+ }
+
+ private void validate() {
+ Validate.isTrue(replicaConfigByName.size() > 0, "task config mongoAdd need special replicaSet addr");
+
+ }
+
+ public Map<String, ReplicaSetConfig> getReplicaConfigByName() {
+ return replicaConfigByName;
+ }
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
new file mode 100644
index 0000000..f66ca14
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
@@ -0,0 +1,129 @@
+package org.apache.connect.mongo.replicator;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.SourceTaskConfig;
+import org.apache.connect.mongo.connector.builder.MongoDataEntry;
+import org.apache.connect.mongo.initsync.CollectionMeta;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ReplicaSetsContext {
+
+ private BlockingQueue<SourceDataEntry> dataEntryQueue;
+
+ private SourceTaskConfig taskConfig;
+
+ private List<ReplicaSet> replicaSets;
+
+ private AtomicBoolean initSyncAbort = new AtomicBoolean();
+
+ private Filter filter;
+
+ public ReplicaSetsContext(SourceTaskConfig taskConfig) {
+ this.taskConfig = taskConfig;
+ this.replicaSets = new CopyOnWriteArrayList<>();
+ this.dataEntryQueue = new LinkedBlockingDeque<>();
+ this.filter = new Filter(taskConfig);
+ }
+
+
+ public MongoClient createMongoClient(ReplicaSetConfig replicaSetConfig) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("mongodb://");
+ if (StringUtils.isNotBlank(taskConfig.getMongoUserName())
+ && StringUtils.isNotBlank(taskConfig.getMongoPassWord())) {
+ sb.append(taskConfig.getMongoUserName());
+ sb.append(":");
+ sb.append(taskConfig.getMongoPassWord());
+ sb.append("@");
+
+ }
+ sb.append(replicaSetConfig.getHost());
+ sb.append("/");
+ if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
+ sb.append("?");
+ sb.append("replicaSet=");
+ sb.append(replicaSetConfig.getReplicaSetName());
+ }
+ ConnectionString connectionString = new ConnectionString(sb.toString());
+ return MongoClients.create(connectionString);
+ }
+
+
+ public boolean filterEvent(ReplicationEvent event) {
+ return filter.filterEvent(event);
+ }
+
+
+ public boolean filterMeta(CollectionMeta collectionMeta) {
+ return filter.filterMeta(collectionMeta);
+ }
+
+
+ public int getCopyThread() {
+ return taskConfig.getCopyThread() > 0 ? taskConfig.getCopyThread() : Runtime.getRuntime().availableProcessors();
+ }
+
+
+ public void addReplicaSet(ReplicaSet replicaSet) {
+ this.replicaSets.add(replicaSet);
+ }
+
+
+ public void shutdown() {
+ replicaSets.forEach(ReplicaSet::shutdown);
+ }
+
+ public void pause() {
+ replicaSets.forEach(ReplicaSet::pause);
+ }
+
+
+ public void resume() {
+ replicaSets.forEach(ReplicaSet::resume);
+ }
+
+
+ public void publishEvent(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
+ SourceDataEntry sourceDataEntry = MongoDataEntry.createSouceDataEntry(event, replicaSetConfig);
+ while (true) {
+ try {
+ dataEntryQueue.put(sourceDataEntry);
+ break;
+ } catch (InterruptedException e) {
+ }
+ }
+
+ }
+
+ public Collection<SourceDataEntry> poll() {
+ List<SourceDataEntry> res = new ArrayList<>();
+ if (dataEntryQueue.drainTo(res, 20) == 0) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ }
+ return res;
+ }
+
+ public boolean initSyncAbort() {
+ return initSyncAbort.get();
+ }
+
+ public void initSyncError() {
+ initSyncAbort.set(true);
+ }
+
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index 766225f..bf4ebac 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -6,9 +6,8 @@ import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
-import org.apache.connect.mongo.MongoReplicatorConfig;
import org.apache.connect.mongo.initsync.InitSync;
-import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.event.EventConverter;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
import org.bson.Document;
import org.slf4j.Logger;
@@ -19,34 +18,34 @@ public class ReplicatorTask implements Runnable {
private Logger logger = LoggerFactory.getLogger(this.getClass());
- private MongoReplicator mongoReplicator;
+ private ReplicaSet replicaSet;
private MongoClient mongoClient;
- private MongoReplicatorConfig mongoReplicatorConfig;
+ private ReplicaSetConfig replicaSetConfig;
- private Filter filter;
+ private ReplicaSetsContext replicaSetsContext;
- public ReplicatorTask(MongoReplicator mongoReplicator, MongoClient mongoClient, MongoReplicatorConfig mongoReplicatorConfig, Filter filter) {
- this.mongoReplicator = mongoReplicator;
- this.mongoReplicatorConfig = mongoReplicatorConfig;
+ public ReplicatorTask(ReplicaSet replicaSet, MongoClient mongoClient, ReplicaSetConfig replicaSetConfig, ReplicaSetsContext replicaSetsContext) {
+ this.replicaSet = replicaSet;
+ this.replicaSetConfig = replicaSetConfig;
this.mongoClient = mongoClient;
- this.filter = filter;
+ this.replicaSetsContext = replicaSetsContext;
}
@Override
public void run() {
- if (mongoReplicatorConfig.getDataSync()) {
- InitSync initSync = new InitSync(mongoReplicatorConfig, mongoClient, filter, mongoReplicator);
+ if (replicaSetConfig.getPosition() == null || replicaSetConfig.getPosition().isInitSync()) {
+ InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet);
initSync.start();
}
MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
FindIterable<Document> iterable;
- if (mongoReplicatorConfig.getPositionTimeStamp() > 0 && mongoReplicatorConfig.getPositionTimeStamp() < System.currentTimeMillis()) {
+ if (replicaSetConfig.getPosition().isValid()) {
iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
- Filters.gt("ts", mongoReplicatorConfig.getPosition()));
+ Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
} else {
iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
}
@@ -56,26 +55,28 @@ public class ReplicatorTask implements Runnable {
.batchSize(200)
.iterator();
- while (mongoReplicator.isRuning()) {
+ while (replicaSet.isRuning()) {
try {
executorCursor(cursor);
- Thread.sleep(100);
} catch (Exception e) {
- e.printStackTrace();
+ logger.error("replicaSet:{} shutdown.....", replicaSetConfig, e);
} finally {
- logger.error("mongoReplicator shutdown.....");
- mongoReplicator.shutdown();
+ if (cursor != null) {
+ cursor.close();
+ }
+ replicaSet.shutdown();
}
}
+ logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig);
}
private void executorCursor(MongoCursor<Document> cursor) {
- while (cursor.hasNext() && !mongoReplicator.isPause()) {
+ while (cursor.hasNext() && !replicaSet.isPause()) {
Document document = cursor.next();
- ReplicationEvent event = DocumentConvertEvent.convert(document);
- if (filter.filterEvent(event)) {
- mongoReplicator.publishEvent(event);
+ ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
+ if (replicaSetsContext.filterEvent(event)) {
+ replicaSetsContext.publishEvent(event, replicaSetConfig);
}
}
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
deleted file mode 100644
index a18aa52..0000000
--- a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.connect.mongo.replicator.event;
-
-import org.bson.BsonTimestamp;
-import org.bson.Document;
-
-import java.util.Optional;
-
-import static org.apache.connect.mongo.replicator.Constants.*;
-
-
-public class DocumentConvertEvent {
-
-
- public static ReplicationEvent convert(Document document) {
-
- OperationType operationType = OperationType.getOperationType(document.getString(OPERATIONTYPE));
- BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP);
-// Long t = document.getLong("t");
- Long h = document.getLong(HASH);
- Integer v = document.getInteger(VERSION);
- String nameSpace = document.getString(NAMESPACE);
-// String uuid = document.getString("uuid");
-// Date wall = document.getDate("wall");
- Document operation = document.get(OPERATION, Document.class);
- Document objectID = document.get(OBJECTID, Document.class);
- return new ReplicationEvent(operationType, timestamp, v, h, nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), document);
- }
-
-}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
new file mode 100644
index 0000000..57b4ac8
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
@@ -0,0 +1,29 @@
+package org.apache.connect.mongo.replicator.event;
+
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+
+import java.util.Optional;
+
+import static org.apache.connect.mongo.replicator.Constants.*;
+
+
+public class EventConverter {
+
+
+ public static ReplicationEvent convert(Document document, String replicaSetName) {
+
+ ReplicationEvent event = new ReplicationEvent();
+ event.setOperationType(OperationType.getOperationType(document.getString(OPERATIONTYPE)));
+ event.setTimestamp(document.get(TIMESTAMP, BsonTimestamp.class));
+ event.setH(document.getLong(HASH));
+ event.setV(document.getInteger(VERSION));
+ event.setNamespace(document.getString(NAMESPACE));
+ event.setEventData(Optional.ofNullable(document.get(OPERATION, Document.class)));
+ event.setObjectId(Optional.ofNullable(document.get(OBJECTID, Document.class)));
+ event.setReplicaSetName(replicaSetName);
+ event.setDocument(document);
+ return event;
+ }
+
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
index 7719b3e..283e9d6 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
@@ -19,6 +19,7 @@ public class ReplicationEvent {
private String namespace;
private Optional<Document> eventData;
private Optional<Document> objectId;
+ private String replicaSetName;
public ReplicationEvent() {
@@ -136,18 +137,29 @@ public class ReplicationEvent {
this.objectId = objectId;
}
+
+ public void setReplicaSetName(String replicaSetName) {
+ this.replicaSetName = replicaSetName;
+ }
+
+ public String getReplicaSetName() {
+ return replicaSetName;
+ }
+
@Override
public String toString() {
return "ReplicationEvent{" +
- "operationType=" + operationType +
+ "document=" + document +
+ ", operationType=" + operationType +
", v=" + v +
", h=" + h +
", timestamp=" + timestamp +
", databaseName='" + databaseName + '\'' +
", collectionName='" + collectionName + '\'' +
", namespace='" + namespace + '\'' +
- ", eventData=" + eventData.toString() +
- ", objectId=" + objectId.toString() +
+ ", eventData=" + eventData +
+ ", objectId=" + objectId +
+ ", replicaSetName='" + replicaSetName + '\'' +
'}';
}
}
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
index 2b14b13..60e2514 100644
--- a/src/test/java/org/apache/connect/mongo/FilterTest.java
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -17,12 +17,12 @@ import java.util.Map;
public class FilterTest {
- private MongoReplicatorConfig config;
+ private SourceTaskConfig sourceTaskConfig;
private Map<String, List<String>> insterest;
@Before
public void init() {
- config = new MongoReplicatorConfig();
+ sourceTaskConfig = new SourceTaskConfig();
insterest = new HashMap<>();
}
@@ -31,18 +31,18 @@ public class FilterTest {
List<String> collections = new ArrayList<>();
collections.add("person");
insterest.put("test", collections);
- config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
- Filter filter = new Filter(config);
- Assert.assertTrue(filter.filter(new CollectionMeta("test", "person")));
- Assert.assertFalse(filter.filter(new CollectionMeta("test", "person01")));
+ sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
+ Filter filter = new Filter(sourceTaskConfig);
+ Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "person")));
+ Assert.assertFalse(filter.filterMeta(new CollectionMeta("test", "person01")));
}
@Test
public void testBlankDb() {
- Filter filter = new Filter(config);
- Assert.assertTrue(filter.filter(new CollectionMeta("test" ,"test")));
- Assert.assertTrue(filter.filter(new CollectionMeta("test1" ,"test01")));
+ Filter filter = new Filter(sourceTaskConfig);
+ Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "test")));
+ Assert.assertTrue(filter.filterMeta(new CollectionMeta("test1", "test01")));
}
@@ -51,17 +51,17 @@ public class FilterTest {
List<String> collections = new ArrayList<>();
collections.add("*");
insterest.put("test", collections);
- config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
- Filter filter = new Filter(config);
- Assert.assertTrue(filter.filter(new CollectionMeta("test", "testsad")));
- Assert.assertTrue(filter.filter(new CollectionMeta("test", "tests032")));
+ sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
+ Filter filter = new Filter(sourceTaskConfig);
+ Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "testsad")));
+ Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "tests032")));
}
@Test
public void testFilterEvent() {
- Filter filter = new Filter(config);
+ Filter filter = new Filter(sourceTaskConfig);
ReplicationEvent replicationEvent = new ReplicationEvent();
replicationEvent.setOperationType(OperationType.NOOP);
Assert.assertFalse(filter.filterEvent(replicationEvent));
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
index 04e1374..d921e63 100644
--- a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
@@ -1,22 +1,40 @@
package org.apache.connect.mongo;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.commons.lang3.StringUtils;
import org.apache.connect.mongo.connector.MongoSourceConnector;
import org.apache.connect.mongo.connector.MongoSourceTask;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingQueue;
+
public class MongoSourceConnectorTest {
private MongoSourceConnector mongoSourceConnector;
private DefaultKeyValue keyValue;
+ private SourceTaskConfig sourceTaskConfig;
@Before
public void before() {
mongoSourceConnector = new MongoSourceConnector();
keyValue = new DefaultKeyValue();
+ sourceTaskConfig = new SourceTaskConfig();
+
}
@Test
@@ -27,19 +45,58 @@ public class MongoSourceConnectorTest {
@Test
public void verifyConfig() {
- keyValue.put("mongoAddr", "127.0.0.1");
+ keyValue.put("mongoAddr", "shardName=replicaName:127.0.0.1:27017");
String s = mongoSourceConnector.verifyAndSetConfig(keyValue);
- Assert.assertTrue(s.contains("Request config key:"));
- keyValue.put("mongoPort", 27017);
- s = mongoSourceConnector.verifyAndSetConfig(keyValue);
- Assert.assertTrue(StringUtils.isBlank(s));
+ Assert.assertTrue(s.contains("Request sourceTaskConfig key:"));
}
+ @Test
+ public void testPoll() throws Exception {
+ LinkedBlockingQueue<SourceDataEntry> entries = new LinkedBlockingQueue<>();
+ ReplicaSetsContext context = new ReplicaSetsContext(sourceTaskConfig);
+ Field dataEntryQueue = ReplicaSetsContext.class.getDeclaredField("dataEntryQueue");
+ dataEntryQueue.setAccessible(true);
+ dataEntryQueue.set(context, entries);
+ ReplicationEvent event = new ReplicationEvent();
+ event.setOperationType(OperationType.INSERT);
+ event.setNamespace("test.person");
+ event.setTimestamp(new BsonTimestamp(1565609506, 1));
+ event.setDocument(new Document("testKey", "testValue"));
+ event.setH(324243242L);
+ event.setEventData(Optional.ofNullable(new Document("testEventKey", "testEventValue")));
+ event.setObjectId(Optional.empty());
+ context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27017"));
+ List<SourceDataEntry> sourceDataEntries = (List<SourceDataEntry>) context.poll();
+ Assert.assertTrue(sourceDataEntries.size() == 1);
+
+ SourceDataEntry sourceDataEntry = sourceDataEntries.get(0);
+ Assert.assertEquals("test-person", sourceDataEntry.getQueueName());
+
+ ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
+ Assert.assertEquals("testReplicaName", new String(sourcePartition.array()));
+ ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
+ ReplicaSetConfig.Position position = JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class);
+ Assert.assertEquals(position.getTimeStamp(), 1565609506);
+ Assert.assertEquals(position.getInc(), 1);
+ Assert.assertEquals(position.isInitSync(), false);
+ EntryType entryType = sourceDataEntry.getEntryType();
+ Assert.assertEquals(EntryType.CREATE, entryType);
+
+ String queueName = sourceDataEntry.getQueueName();
+ Assert.assertEquals("test-person", queueName);
+
+ Schema schema = sourceDataEntry.getSchema();
+ Assert.assertTrue(schema.getFields().size() == 6);
+ Object[] payload = sourceDataEntry.getPayload();
+ Assert.assertTrue(payload.length == 6);
+
+ }
+
}
diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java
index cc83fbe..7b0291a 100644
--- a/src/test/java/org/apache/connect/mongo/MongoTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -7,11 +7,14 @@ import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
import org.apache.connect.mongo.initsync.InitSync;
import org.apache.connect.mongo.replicator.Constants;
-import org.apache.connect.mongo.replicator.Filter;
-import org.apache.connect.mongo.replicator.MongoReplicator;
-import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.ReplicaSet;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.apache.connect.mongo.replicator.event.EventConverter;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
import org.bson.BsonTimestamp;
@@ -21,12 +24,8 @@ import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.nio.ByteBuffer;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class MongoTest {
@@ -36,7 +35,7 @@ public class MongoTest {
@Before
public void before() {
MongoClientSettings.Builder builder = MongoClientSettings.builder();
- builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27018"));
+ builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077"));
mongoClient = MongoClients.create(builder.build());
}
@@ -52,13 +51,14 @@ public class MongoTest {
Document document = new Document();
document.put("test", "test");
oplog.put(Constants.OPERATION, document);
- ReplicationEvent event = DocumentConvertEvent.convert(oplog);
+ ReplicationEvent event = EventConverter.convert(oplog, "testR");
Assert.assertEquals(timestamp, event.getTimestamp());
Assert.assertEquals("test.person", event.getNamespace());
Assert.assertTrue(11111L == event.getH());
Assert.assertEquals(OperationType.INSERT, event.getOperationType());
Assert.assertEquals(EntryType.CREATE, event.getEntryType());
Assert.assertEquals(document, event.getEventData().get());
+ Assert.assertEquals("testR", event.getReplicaSetName());
}
@@ -68,38 +68,58 @@ public class MongoTest {
public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person");
collection.deleteMany(new Document());
- int count = 1;
- List<Document> documents = new ArrayList<>(count);
+ int count = 1000;
+ List<String> documents = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Document document = new Document();
document.put("name", "test" + i);
document.put("age", i);
document.put("sex", i % 2 == 0 ? "boy" : "girl");
collection.insertOne(document);
- documents.add(document);
+ documents.add(document.getObjectId("_id").toHexString());
}
- MongoReplicatorConfig config = new MongoReplicatorConfig();
+ SourceTaskConfig sourceTaskConfig = new SourceTaskConfig();
Map<String, List<String>> insterest = new HashMap<>();
List<String> collections = new ArrayList<>();
collections.add("*");
insterest.put("test", collections);
- config.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
- MongoReplicator mongoReplicator = new MongoReplicator(config);
- Field running = MongoReplicator.class.getDeclaredField("running");
+ sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
+ ReplicaSetConfig replicaSetConfig = new ReplicaSetConfig("", "test", "localhost");
+ ReplicaSetsContext replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
+ ReplicaSet replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
+ Field running = ReplicaSet.class.getDeclaredField("running");
running.setAccessible(true);
- running.set(mongoReplicator, new AtomicBoolean(true));
- InitSync initSync = new InitSync(config, mongoClient, new Filter(config), mongoReplicator);
+ running.set(replicaSet, new AtomicBoolean(true));
+ InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet);
initSync.start();
- BlockingQueue<ReplicationEvent> queue = mongoReplicator.getQueue();
- while (count > 0) {
- count--;
- ReplicationEvent event = queue.poll(100, TimeUnit.MILLISECONDS);
- Assert.assertTrue(event.getOperationType().equals(OperationType.CREATED));
- Assert.assertNotNull(event.getDocument());
- Assert.assertTrue(documents.contains(event.getDocument()));
- }
-
+ int syncCount = 0;
+ while (syncCount < count) {
+ Collection<SourceDataEntry> sourceDataEntries = replicaSetsContext.poll();
+ Assert.assertNotNull(sourceDataEntries);
+ for (SourceDataEntry sourceDataEntry : sourceDataEntries) {
+ ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
+ Assert.assertEquals("test", new String(sourcePartition.array()));
+ ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
+ ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
+ position.setInitSync(true);
+ position.setTimeStamp(0);
+ position.setInc(0);
+ Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class));
+ EntryType entryType = sourceDataEntry.getEntryType();
+ Assert.assertEquals(EntryType.CREATE, entryType);
+ String queueName = sourceDataEntry.getQueueName();
+ Assert.assertEquals("test-person", queueName);
+ Schema schema = sourceDataEntry.getSchema();
+ Assert.assertTrue(schema.getFields().size() == 2);
+ Object[] payload = sourceDataEntry.getPayload();
+ Assert.assertTrue(payload.length == 2);
+ Assert.assertEquals(payload[0].toString(), "test.person");
+ Assert.assertTrue(documents.contains(JSONObject.parseObject(payload[1].toString(), Document.class).get("_id", JSONObject.class).getString("$oid")));
+ syncCount++;
+ }
+ }
+ Assert.assertTrue(syncCount == count);
}
}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
new file mode 100644
index 0000000..8613c42
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
@@ -0,0 +1,34 @@
+package org.apache.connect.mongo;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoIterable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicaContextTest {
+
+ private ReplicaSetsContext context;
+
+ @Before
+ public void before() {
+ SourceTaskConfig sourceTaskConfig = new SourceTaskConfig();
+ context = new ReplicaSetsContext(sourceTaskConfig);
+ }
+
+
+ @Test
+ public void testCreateMongoClient() {
+ MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27017"));
+ MongoIterable<String> collectionNames = mongoClient.getDatabase("local").listCollectionNames();
+ MongoCursor<String> iterator = collectionNames.iterator();
+ while (iterator.hasNext()) {
+ Assert.assertTrue(StringUtils.isNoneBlank(iterator.next()));
+ }
+ }
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
new file mode 100644
index 0000000..e69eac6
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
@@ -0,0 +1,65 @@
+package org.apache.connect.mongo;
+
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class ReplicaSetsTest {
+
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreatReplicaSetsException01() {
+ ReplicaSets.create("");
+ }
+
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreatReplicaSetsException02() {
+ ReplicaSets.create("127.0.0.1:27081");
+ }
+
+
+ @Test
+ public void testCreatReplicaSets01() {
+ ReplicaSets replicaSets = ReplicaSets.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
+ Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+ Assert.assertTrue(replicaSetConfigMap.size() == 1);
+ Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
+ Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
+ Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName());
+ }
+
+
+ @Test
+ public void testCreatReplicaSets02() {
+ ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
+ Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+ Assert.assertTrue(replicaSetConfigMap.size() == 1);
+ Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
+ Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
+ Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName());
+ Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName());
+ }
+
+
+ @Test
+ public void testCreatReplicaSets03() {
+ ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283");
+ Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+ Assert.assertTrue(replicaSetConfigMap.size() == 2);
+ Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
+ Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
+ Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName());
+ Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName());
+
+
+ Assert.assertNotNull(replicaSetConfigMap.get("replicaName2"));
+ Assert.assertEquals("127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283", replicaSetConfigMap.get("replicaName2").getHost());
+ Assert.assertEquals("replicaName2", replicaSetConfigMap.get("replicaName2").getReplicaSetName());
+ Assert.assertEquals("shardName2", replicaSetConfigMap.get("replicaName2").getShardName());
+ }
+
+}