You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:32 UTC
[rocketmq-connect] 09/13: reformat code and add more test case
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit f193fcdc1c654837d9dfee60ea4b2e56afbd6d5d
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Thu Aug 15 15:22:58 2019 +0800
reformat code and add more test case
---
README.md | 12 ++
.../org/apache/connect/mongo/SourceTaskConfig.java | 116 +++++++++++++-
.../mongo/connector/MongoSourceConnector.java | 6 +-
.../connect/mongo/connector/MongoSourceTask.java | 26 ++--
.../mongo/connector/builder/MongoDataEntry.java | 39 ++---
.../connect/mongo/initsync/CollectionMeta.java | 6 +-
.../apache/connect/mongo/initsync/InitSync.java | 30 ++--
.../apache/connect/mongo/replicator/Constants.java | 6 -
.../apache/connect/mongo/replicator/Filter.java | 15 +-
.../mongo/replicator/MongoClientFactory.java | 113 ++++++++++++++
.../connect/mongo/replicator/ReplicaSet.java | 11 +-
.../connect/mongo/replicator/ReplicaSetConfig.java | 45 +++---
.../connect/mongo/replicator/ReplicaSets.java | 8 +-
.../mongo/replicator/ReplicaSetsContext.java | 43 +----
.../connect/mongo/replicator/ReplicatorTask.java | 16 +-
.../mongo/replicator/event/EventConverter.java | 13 +-
.../mongo/replicator/event/ReplicationEvent.java | 36 ++---
.../java/org/apache/connect/mongo/FilterTest.java | 15 +-
.../org/apache/connect/mongo/MongoFactoryTest.java | 173 +++++++++++++++++++++
.../connect/mongo/MongoSourceConnectorTest.java | 19 +--
.../apache/connect/mongo/MongoSourceTaskTest.java | 143 +++++++++++++++++
.../java/org/apache/connect/mongo/MongoTest.java | 18 +--
.../apache/connect/mongo/ReplicaContextTest.java | 3 +-
.../org/apache/connect/mongo/ReplicaSetTest.java | 59 +++++++
.../org/apache/connect/mongo/ReplicaSetsTest.java | 19 +--
25 files changed, 758 insertions(+), 232 deletions(-)
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..fb2c6c6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,12 @@
+# RocketMQ-connect-mongo
+
+this is source connector moudle for mongo,you can run this by running rocketmq connecotr api,
+
+some junit rely on mongo database you can start with a docker container
+
+`docker run -p27027:27017 --name mongo-test -d mongo:4.0.10 --replSet "repl1"`
+
+and then init a mongo replicaSet
+
+`docker exec -it mongo-test mongo ` and `rs.initiate()` and then you can run all junit test
+
diff --git a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
index b79b2e9..3df9dc4 100644
--- a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
+++ b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
@@ -1,12 +1,11 @@
package org.apache.connect.mongo;
import io.openmessaging.KeyValue;
-import org.bson.BsonTimestamp;
-
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import org.bson.BsonTimestamp;
public class SourceTaskConfig {
@@ -18,15 +17,50 @@ public class SourceTaskConfig {
private String positionTimeStamp;
private String positionInc;
private String dataSync;
+ private String serverSelectionTimeoutMS;
+ private String connectTimeoutMS;
+ private String socketTimeoutMS;
+ private String ssl;
+ private String tsl;
+ private String tlsInsecure;
+ private String sslInvalidHostNameAllowed;
+ private String tlsAllowInvalidHostnames;
+ private String compressors;
+ private String zlibCompressionLevel;
+ private String trustStore;
+ private String trustStorePassword;
private int copyThread = Runtime.getRuntime().availableProcessors();
-
public static final Set<String> REQUEST_CONFIG = Collections.unmodifiableSet(new HashSet<String>() {
{
add("mongoAddr");
}
});
+ public String getTrustStore() {
+ return trustStore;
+ }
+
+ public void setTrustStore(String trustStore) {
+ this.trustStore = trustStore;
+ }
+
+ public String getTrustStorePassword() {
+ return trustStorePassword;
+ }
+
+ public void setTrustStorePassword(String trustStorePassword) {
+ this.trustStorePassword = trustStorePassword;
+ }
+
+ public String getZlibCompressionLevel() {
+ return zlibCompressionLevel;
+ }
+
+ public void setZlibCompressionLevel(String zlibCompressionLevel) {
+ this.zlibCompressionLevel = zlibCompressionLevel;
+ }
+
public String getPositionInc() {
return positionInc;
}
@@ -67,7 +101,6 @@ public class SourceTaskConfig {
this.mongoAddr = mongoAddr;
}
-
public String getMongoUserName() {
return mongoUserName;
}
@@ -84,7 +117,6 @@ public class SourceTaskConfig {
this.mongoPassWord = mongoPassWord;
}
-
public String getDataSync() {
return dataSync;
}
@@ -93,15 +125,86 @@ public class SourceTaskConfig {
this.dataSync = dataSync;
}
-
public String getReplicaSet() {
return replicaSet;
}
+ public String getServerSelectionTimeoutMS() {
+ return serverSelectionTimeoutMS;
+ }
+
+ public void setServerSelectionTimeoutMS(String serverSelectionTimeoutMS) {
+ this.serverSelectionTimeoutMS = serverSelectionTimeoutMS;
+ }
+
public void setReplicaSet(String replicaSet) {
this.replicaSet = replicaSet;
}
+ public String getConnectTimeoutMS() {
+ return connectTimeoutMS;
+ }
+
+ public void setConnectTimeoutMS(String connectTimeoutMS) {
+ this.connectTimeoutMS = connectTimeoutMS;
+ }
+
+ public String getSocketTimeoutMS() {
+ return socketTimeoutMS;
+ }
+
+ public void setSocketTimeoutMS(String socketTimeoutMS) {
+ this.socketTimeoutMS = socketTimeoutMS;
+ }
+
+ public String getSsl() {
+ return ssl;
+ }
+
+ public void setSsl(String ssl) {
+ this.ssl = ssl;
+ }
+
+ public String getTsl() {
+ return tsl;
+ }
+
+ public void setTsl(String tsl) {
+ this.tsl = tsl;
+ }
+
+ public String getTlsInsecure() {
+ return tlsInsecure;
+ }
+
+ public void setTlsInsecure(String tlsInsecure) {
+ this.tlsInsecure = tlsInsecure;
+ }
+
+ public String getSslInvalidHostNameAllowed() {
+ return sslInvalidHostNameAllowed;
+ }
+
+ public void setSslInvalidHostNameAllowed(String sslInvalidHostNameAllowed) {
+ this.sslInvalidHostNameAllowed = sslInvalidHostNameAllowed;
+ }
+
+ public String getTlsAllowInvalidHostnames() {
+ return tlsAllowInvalidHostnames;
+ }
+
+ public void setTlsAllowInvalidHostnames(String tlsAllowInvalidHostnames) {
+ this.tlsAllowInvalidHostnames = tlsAllowInvalidHostnames;
+ }
+
+ public String getCompressors() {
+ return compressors;
+ }
+
+ public void setCompressors(String compressors) {
+ this.compressors = compressors;
+ }
+
public void load(KeyValue props) {
properties2Object(props, this);
@@ -148,7 +251,6 @@ public class SourceTaskConfig {
}
}
-
public BsonTimestamp getPosition() {
return new BsonTimestamp(Integer.valueOf(positionTimeStamp), Integer.valueOf(positionInc));
}
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
index 2b28ea2..e3dfb6f 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
@@ -3,19 +3,17 @@ package org.apache.connect.mongo.connector;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.connect.mongo.SourceTaskConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-
public class MongoSourceConnector extends SourceConnector {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private KeyValue keyValueConfig;
-
@Override
public String verifyAndSetConfig(KeyValue config) {
for (String requestKey : SourceTaskConfig.REQUEST_CONFIG) {
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index 407608a..da244cd 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -4,16 +4,19 @@ import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.connector.api.source.SourceTask;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.connect.mongo.SourceTaskConfig;
-import org.apache.connect.mongo.replicator.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.SourceTaskConfig;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.ReplicaSet;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSets;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MongoSourceTask extends SourceTask {
@@ -42,7 +45,7 @@ public class MongoSourceTask extends SourceTask {
replicaSets = ReplicaSets.create(sourceTaskConfig.getMongoAddr());
replicaSets.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> {
ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
- replicaSetName.getBytes()));
+ replicaSetName.getBytes()));
if (byteBuffer != null && byteBuffer.array().length > 0) {
String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8);
ReplicaSetConfig.Position position = JSONObject.parseObject(positionJson, ReplicaSetConfig.Position.class);
@@ -50,11 +53,11 @@ public class MongoSourceTask extends SourceTask {
} else {
ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp() != null
- && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches()
- ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0);
+ && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches()
+ ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0);
position.setInc(sourceTaskConfig.getPositionInc() != null
- && pattern.matcher(sourceTaskConfig.getPositionInc()).matches()
- ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0);
+ && pattern.matcher(sourceTaskConfig.getPositionInc()).matches()
+ ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0);
position.setInitSync(StringUtils.equals(sourceTaskConfig.getDataSync(), Constants.INITSYNC) ? true : false);
replicaSetConfig.setPosition(position);
}
@@ -64,7 +67,6 @@ public class MongoSourceTask extends SourceTask {
replicaSet.start();
});
-
} catch (Throwable throwable) {
logger.error("task start error", throwable);
stop();
diff --git a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
index 7c0db7d..87d92ea 100644
--- a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
+++ b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
@@ -1,19 +1,27 @@
package org.apache.connect.mongo.connector.builder;
-
import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import org.apache.connect.mongo.replicator.Constants;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
import org.bson.BsonTimestamp;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-
-import static org.apache.connect.mongo.replicator.Constants.*;
+import static org.apache.connect.mongo.replicator.Constants.CREATED;
+import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
+import static org.apache.connect.mongo.replicator.Constants.OBJECTID;
+import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE;
+import static org.apache.connect.mongo.replicator.Constants.PATCH;
+import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
+import static org.apache.connect.mongo.replicator.Constants.VERSION;
public class MongoDataEntry {
@@ -28,8 +36,8 @@ public class MongoDataEntry {
Schema schema = createdSchema(replicaSetConfig.getReplicaSetName());
dataEntryBuilder = new DataEntryBuilder(schema);
dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
- .entryType(event.getEntryType());
+ .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
+ .entryType(event.getEntryType());
dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson());
dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
@@ -38,8 +46,8 @@ public class MongoDataEntry {
Schema schema = oplogSchema(replicaSetConfig.getReplicaSetName());
dataEntryBuilder = new DataEntryBuilder(schema);
dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
- .entryType(event.getEntryType());
+ .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
+ .entryType(event.getEntryType());
dataEntryBuilder.putFiled(OPERATIONTYPE, event.getOperationType().name());
dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue());
dataEntryBuilder.putFiled(VERSION, event.getV());
@@ -48,15 +56,13 @@ public class MongoDataEntry {
dataEntryBuilder.putFiled(OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
}
-
String position = createPosition(event, replicaSetConfig);
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)),
- ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8)));
+ ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8)));
return sourceDataEntry;
}
-
private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
BsonTimestamp timestamp = event.getTimestamp();
@@ -76,7 +82,6 @@ public class MongoDataEntry {
return schema;
}
-
private static Schema oplogSchema(String dataSourceName) {
Schema schema = new Schema();
schema.setDataSource(dataSourceName);
@@ -85,7 +90,6 @@ public class MongoDataEntry {
return schema;
}
-
private static void createdField(Schema schema) {
Field namespace = new Field(0, NAMESPACE, FieldType.STRING);
schema.getFields().add(namespace);
@@ -109,5 +113,4 @@ public class MongoDataEntry {
schema.getFields().add(objectId);
}
-
}
diff --git a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
index 018418c..9c73eac 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
@@ -34,8 +34,8 @@ public class CollectionMeta {
@Override
public String toString() {
return "CollectionMeta{" +
- "databaseName='" + databaseName + '\'' +
- ", collectionName='" + collectionName + '\'' +
- '}';
+ "databaseName='" + databaseName + '\'' +
+ ", collectionName='" + collectionName + '\'' +
+ '}';
}
}
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index d92e968..6cdbe06 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -3,6 +3,13 @@ package org.apache.connect.mongo.initsync;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoIterable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.connect.mongo.replicator.ReplicaSet;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.ReplicaSetsContext;
@@ -13,14 +20,6 @@ import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class InitSync {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -34,7 +33,8 @@ public class InitSync {
private CountDownLatch countDownLatch;
private ReplicaSet replicaSet;
- public InitSync(ReplicaSetConfig replicaSetConfig, MongoClient mongoClient, ReplicaSetsContext context, ReplicaSet replicaSet) {
+ public InitSync(ReplicaSetConfig replicaSetConfig, MongoClient mongoClient, ReplicaSetsContext context,
+ ReplicaSet replicaSet) {
this.replicaSetConfig = replicaSetConfig;
this.mongoClient = mongoClient;
this.context = context;
@@ -89,7 +89,6 @@ public class InitSync {
}
-
class CopyRunner implements Runnable {
private MongoClient mongoClient;
@@ -97,7 +96,8 @@ public class InitSync {
private CollectionMeta collectionMeta;
private ReplicaSet replicaSet;
- public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, ReplicaSet replicaSet) {
+ public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta,
+ ReplicaSet replicaSet) {
this.mongoClient = mongoClient;
this.countDownLatch = countDownLatch;
this.collectionMeta = collectionMeta;
@@ -110,10 +110,10 @@ public class InitSync {
int count = 0;
try {
MongoCursor<Document> mongoCursor = mongoClient.getDatabase(collectionMeta.getDatabaseName())
- .getCollection(collectionMeta.getCollectionName())
- .find()
- .batchSize(200)
- .iterator();
+ .getCollection(collectionMeta.getCollectionName())
+ .find()
+ .batchSize(200)
+ .iterator();
while (replicaSet.isRuning() && mongoCursor.hasNext()) {
if (context.initSyncAbort()) {
logger.info("init sync database:{}, collection:{} abort, has copy:{} document", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count);
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
index 1a91a57..c895bd6 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Constants.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
@@ -2,13 +2,9 @@ package org.apache.connect.mongo.replicator;
public class Constants {
-
-
public static final String MONGO_LOCAL_DATABASE = "local";
public static final String MONGO_OPLOG_RS = "oplog.rs";
-
-
public static final String OPERATIONTYPE = "op";
public static final String TIMESTAMP = "ts";
public static final String VERSION = "v";
@@ -17,11 +13,9 @@ public class Constants {
public static final String OPERATION = "o";
public static final String OBJECTID = "o2";
-
public static final String CREATED = "created";
public static final String PATCH = "patch";
-
public static final String INITSYNC = "initSync";
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
index 3e431a3..fd26163 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
@@ -1,26 +1,23 @@
package org.apache.connect.mongo.replicator;
-
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.connect.mongo.SourceTaskConfig;
import org.apache.connect.mongo.initsync.CollectionMeta;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-
public class Filter {
private Function<CollectionMeta, Boolean> dbAndCollectionFilter;
private Map<String, List<String>> interestMap = new HashMap<>();
private Function<OperationType, Boolean> notNoopFilter;
-
public Filter(SourceTaskConfig sourceTaskConfig) {
String interestDbAndCollection = sourceTaskConfig.getInterestDbAndCollection();
@@ -33,7 +30,6 @@ public class Filter {
interestMap.put(db, collections);
}
-
}
dbAndCollectionFilter = (collectionMeta) -> {
@@ -56,13 +52,12 @@ public class Filter {
notNoopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal();
}
-
public boolean filterMeta(CollectionMeta collectionMeta) {
return dbAndCollectionFilter.apply(collectionMeta);
}
public boolean filterEvent(ReplicationEvent event) {
return dbAndCollectionFilter.apply(new CollectionMeta(event.getDatabaseName(), event.getCollectionName()))
- && notNoopFilter.apply(event.getOperationType());
+ && notNoopFilter.apply(event.getOperationType());
}
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
new file mode 100644
index 0000000..f5e01a3
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
@@ -0,0 +1,113 @@
+package org.apache.connect.mongo.replicator;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.SourceTaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoClientFactory {
+
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private SourceTaskConfig taskConfig;
+
+ public MongoClientFactory(SourceTaskConfig sourceTaskConfig) {
+ this.taskConfig = sourceTaskConfig;
+ }
+
+ public MongoClient createMongoClient(ReplicaSetConfig replicaSetConfig) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("mongodb://");
+ if (StringUtils.isNotBlank(taskConfig.getMongoUserName())
+ && StringUtils.isNotBlank(taskConfig.getMongoPassWord())) {
+ sb.append(taskConfig.getMongoUserName());
+ sb.append(":");
+ sb.append(taskConfig.getMongoPassWord());
+ sb.append("@");
+
+ }
+ sb.append(replicaSetConfig.getHost());
+ sb.append("/");
+ if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
+ sb.append("?");
+ sb.append("replicaSet=");
+ sb.append(replicaSetConfig.getReplicaSetName());
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getServerSelectionTimeoutMS())) {
+ sb.append("&");
+ sb.append("serverSelectionTimeoutMS=");
+ sb.append(taskConfig.getServerSelectionTimeoutMS());
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getConnectTimeoutMS())) {
+ sb.append("&");
+ sb.append("connectTimeoutMS=");
+ sb.append(taskConfig.getConnectTimeoutMS());
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getSocketTimeoutMS())) {
+ sb.append("&");
+ sb.append("socketTimeoutMS=");
+ sb.append(taskConfig.getSocketTimeoutMS());
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getSsl()) || StringUtils.isNotBlank(taskConfig.getTsl())) {
+ sb.append("&");
+ sb.append("ssl=");
+ sb.append(true);
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getTlsInsecure())) {
+ sb.append("&");
+ sb.append("tlsInsecure=");
+ sb.append(taskConfig.getTlsInsecure());
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getTlsAllowInvalidHostnames())) {
+ sb.append("&");
+ sb.append("tlsAllowInvalidHostnames=");
+ sb.append(taskConfig.getTlsAllowInvalidHostnames());
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getSslInvalidHostNameAllowed())) {
+ sb.append("&");
+ sb.append("sslInvalidHostNameAllowed=");
+ sb.append(taskConfig.getSslInvalidHostNameAllowed());
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getCompressors())) {
+ sb.append("&");
+ sb.append("compressors=");
+ sb.append(taskConfig.getCompressors());
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getZlibCompressionLevel())) {
+ sb.append("&");
+ sb.append("zlibcompressionlevel=");
+ sb.append(taskConfig.getZlibCompressionLevel());
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getTrustStore())) {
+ Properties properties = System.getProperties();
+ properties.put("javax.net.ssl.trustStore", taskConfig.getTrustStore());
+ logger.info("javax.net.ssl.trustStore: {}", taskConfig.getTrustStore());
+ }
+
+ if (StringUtils.isNotBlank(taskConfig.getTrustStorePassword())) {
+ Properties properties = System.getProperties();
+ properties.put("javax.net.ssl.trustStorePassword", taskConfig.getTrustStorePassword());
+ logger.info("javax.net.ssl.trustStorePassword: {}", taskConfig.getTrustStorePassword());
+ }
+
+ logger.info("connection string :{}", sb.toString());
+ System.out.println(sb.toString());
+ ConnectionString connectionString = new ConnectionString(sb.toString());
+ return MongoClients.create(connectionString);
+ }
+
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
index b141c2b..8f4d0d8 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
@@ -4,18 +4,16 @@ import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.connect.mongo.replicator.Constants.MONGO_LOCAL_DATABASE;
import static org.apache.connect.mongo.replicator.Constants.MONGO_OPLOG_RS;
-
public class ReplicaSet {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -24,7 +22,6 @@ public class ReplicaSet {
private ReplicaSetConfig replicaSetConfig;
-
private ReplicaSetsContext replicaSetsContext;
private MongoClient mongoClient;
@@ -56,7 +53,6 @@ public class ReplicaSet {
}
}
-
public boolean isReplicaMongo() {
MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
MongoIterable<String> collectionNames = local.listCollectionNames();
@@ -82,7 +78,6 @@ public class ReplicaSet {
}
-
public void pause() {
pause = true;
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
index 4b8d148..1b54b17 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
@@ -1,18 +1,15 @@
package org.apache.connect.mongo.replicator;
-import org.bson.BsonTimestamp;
-
import java.util.Objects;
+import org.bson.BsonTimestamp;
public class ReplicaSetConfig {
-
private String shardName;
private String replicaSetName;
private String host;
private Position position;
-
public Position getPosition() {
return position;
}
@@ -55,13 +52,21 @@ public class ReplicaSetConfig {
return new Position(0, 0, true);
}
+ @Override
+ public String toString() {
+ return "ReplicaSetConfig{" +
+ "shardName='" + shardName + '\'' +
+ ", replicaSetName='" + replicaSetName + '\'' +
+ ", host='" + host + '\'' +
+ ", position=" + position +
+ '}';
+ }
public class Position {
private int timeStamp;
private int inc;
private boolean initSync;
-
public int getTimeStamp() {
return timeStamp;
}
@@ -86,7 +91,6 @@ public class ReplicaSetConfig {
this.initSync = initSync;
}
-
public Position(int timeStamp, int inc, boolean initSync) {
this.timeStamp = timeStamp;
this.inc = inc;
@@ -104,20 +108,22 @@ public class ReplicaSetConfig {
@Override
public String toString() {
return "Position{" +
- "timeStamp=" + timeStamp +
- ", inc=" + inc +
- ", initSync=" + initSync +
- '}';
+ "timeStamp=" + timeStamp +
+ ", inc=" + inc +
+ ", initSync=" + initSync +
+ '}';
}
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
Position position = (Position) o;
return timeStamp == position.timeStamp &&
- inc == position.inc &&
- initSync == position.initSync;
+ inc == position.inc &&
+ initSync == position.initSync;
}
@Override
@@ -125,15 +131,4 @@ public class ReplicaSetConfig {
return Objects.hash(timeStamp, inc, initSync);
}
}
-
-
- @Override
- public String toString() {
- return "ReplicaSetConfig{" +
- "shardName='" + shardName + '\'' +
- ", replicaSetName='" + replicaSetName + '\'' +
- ", host='" + host + '\'' +
- ", position=" + position +
- '}';
- }
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
index 9184b90..af1ebeb 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
@@ -1,23 +1,20 @@
package org.apache.connect.mongo.replicator;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.Validate;
-
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
public class ReplicaSets {
private static final Pattern HOST_PATTERN = Pattern.compile("((([^=]+)[=])?(([^/]+)\\/))?(.+)");
-
private final Map<String, ReplicaSetConfig> replicaConfigByName = new HashMap<>();
-
public ReplicaSets(Set<ReplicaSetConfig> replicaSetConfigs) {
replicaSetConfigs.forEach(replicaSetConfig -> {
if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
@@ -43,7 +40,6 @@ public class ReplicaSets {
return new ReplicaSets(replicaSetConfigs);
}
-
private static ReplicaSetConfig parseReplicaSetStr(String hosts) {
if (hosts != null) {
Matcher matcher = HOST_PATTERN.matcher(hosts);
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
index f66ca14..e599f5b 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
@@ -1,15 +1,7 @@
package org.apache.connect.mongo.replicator;
-import com.mongodb.ConnectionString;
import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
import io.openmessaging.connector.api.data.SourceDataEntry;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.connect.mongo.SourceTaskConfig;
-import org.apache.connect.mongo.connector.builder.MongoDataEntry;
-import org.apache.connect.mongo.initsync.CollectionMeta;
-import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -17,6 +9,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.connect.mongo.SourceTaskConfig;
+import org.apache.connect.mongo.connector.builder.MongoDataEntry;
+import org.apache.connect.mongo.initsync.CollectionMeta;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
public class ReplicaSetsContext {
@@ -30,57 +26,36 @@ public class ReplicaSetsContext {
private Filter filter;
+ private MongoClientFactory mongoClientFactory;
+
public ReplicaSetsContext(SourceTaskConfig taskConfig) {
this.taskConfig = taskConfig;
this.replicaSets = new CopyOnWriteArrayList<>();
this.dataEntryQueue = new LinkedBlockingDeque<>();
this.filter = new Filter(taskConfig);
+ this.mongoClientFactory = new MongoClientFactory(taskConfig);
}
-
public MongoClient createMongoClient(ReplicaSetConfig replicaSetConfig) {
- StringBuilder sb = new StringBuilder();
- sb.append("mongodb://");
- if (StringUtils.isNotBlank(taskConfig.getMongoUserName())
- && StringUtils.isNotBlank(taskConfig.getMongoPassWord())) {
- sb.append(taskConfig.getMongoUserName());
- sb.append(":");
- sb.append(taskConfig.getMongoPassWord());
- sb.append("@");
-
- }
- sb.append(replicaSetConfig.getHost());
- sb.append("/");
- if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
- sb.append("?");
- sb.append("replicaSet=");
- sb.append(replicaSetConfig.getReplicaSetName());
- }
- ConnectionString connectionString = new ConnectionString(sb.toString());
- return MongoClients.create(connectionString);
+ return mongoClientFactory.createMongoClient(replicaSetConfig);
}
-
public boolean filterEvent(ReplicationEvent event) {
return filter.filterEvent(event);
}
-
public boolean filterMeta(CollectionMeta collectionMeta) {
return filter.filterMeta(collectionMeta);
}
-
public int getCopyThread() {
return taskConfig.getCopyThread() > 0 ? taskConfig.getCopyThread() : Runtime.getRuntime().availableProcessors();
}
-
public void addReplicaSet(ReplicaSet replicaSet) {
this.replicaSets.add(replicaSet);
}
-
public void shutdown() {
replicaSets.forEach(ReplicaSet::shutdown);
}
@@ -89,12 +64,10 @@ public class ReplicaSetsContext {
replicaSets.forEach(ReplicaSet::pause);
}
-
public void resume() {
replicaSets.forEach(ReplicaSet::resume);
}
-
public void publishEvent(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
SourceDataEntry sourceDataEntry = MongoDataEntry.createSouceDataEntry(event, replicaSetConfig);
while (true) {
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index bf4ebac..6cb46d1 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -13,7 +13,6 @@ import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class ReplicatorTask implements Runnable {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -26,7 +25,8 @@ public class ReplicatorTask implements Runnable {
private ReplicaSetsContext replicaSetsContext;
- public ReplicatorTask(ReplicaSet replicaSet, MongoClient mongoClient, ReplicaSetConfig replicaSetConfig, ReplicaSetsContext replicaSetsContext) {
+ public ReplicatorTask(ReplicaSet replicaSet, MongoClient mongoClient, ReplicaSetConfig replicaSetConfig,
+ ReplicaSetsContext replicaSetsContext) {
this.replicaSet = replicaSet;
this.replicaSetConfig = replicaSetConfig;
this.mongoClient = mongoClient;
@@ -45,15 +45,15 @@ public class ReplicatorTask implements Runnable {
FindIterable<Document> iterable;
if (replicaSetConfig.getPosition().isValid()) {
iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
- Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
+ Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
} else {
iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
}
MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1))
- .noCursorTimeout(true)
- .cursorType(CursorType.TailableAwait)
- .batchSize(200)
- .iterator();
+ .noCursorTimeout(true)
+ .cursorType(CursorType.TailableAwait)
+ .batchSize(200)
+ .iterator();
while (replicaSet.isRuning()) {
try {
@@ -70,7 +70,6 @@ public class ReplicatorTask implements Runnable {
logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig);
}
-
private void executorCursor(MongoCursor<Document> cursor) {
while (cursor.hasNext() && !replicaSet.isPause()) {
Document document = cursor.next();
@@ -81,5 +80,4 @@ public class ReplicatorTask implements Runnable {
}
}
-
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
index 57b4ac8..1b48990 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
@@ -1,16 +1,19 @@
package org.apache.connect.mongo.replicator.event;
+import java.util.Optional;
import org.bson.BsonTimestamp;
import org.bson.Document;
-import java.util.Optional;
-
-import static org.apache.connect.mongo.replicator.Constants.*;
-
+import static org.apache.connect.mongo.replicator.Constants.HASH;
+import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
+import static org.apache.connect.mongo.replicator.Constants.OBJECTID;
+import static org.apache.connect.mongo.replicator.Constants.OPERATION;
+import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE;
+import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
+import static org.apache.connect.mongo.replicator.Constants.VERSION;
public class EventConverter {
-
public static ReplicationEvent convert(Document document, String replicaSetName) {
ReplicationEvent event = new ReplicationEvent();
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
index 283e9d6..6407781 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
@@ -1,12 +1,11 @@
package org.apache.connect.mongo.replicator.event;
import io.openmessaging.connector.api.data.EntryType;
+import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonTimestamp;
import org.bson.Document;
-import java.util.Optional;
-
public class ReplicationEvent {
private Document document;
@@ -21,13 +20,12 @@ public class ReplicationEvent {
private Optional<Document> objectId;
private String replicaSetName;
-
public ReplicationEvent() {
}
-
- public ReplicationEvent(OperationType operationType, BsonTimestamp timestamp, Integer v, Long h, String namespace, Optional<Document> eventData, Optional<Document> objectId, Document document) {
+ public ReplicationEvent(OperationType operationType, BsonTimestamp timestamp, Integer v, Long h, String namespace,
+ Optional<Document> eventData, Optional<Document> objectId, Document document) {
this.operationType = operationType;
this.v = v;
this.h = h;
@@ -41,7 +39,6 @@ public class ReplicationEvent {
this.document = document;
}
-
public OperationType getOperationType() {
return operationType;
}
@@ -91,12 +88,10 @@ public class ReplicationEvent {
}
}
-
public void setOperationType(OperationType operationType) {
this.operationType = operationType;
}
-
public Document getDocument() {
return document;
}
@@ -137,7 +132,6 @@ public class ReplicationEvent {
this.objectId = objectId;
}
-
public void setReplicaSetName(String replicaSetName) {
this.replicaSetName = replicaSetName;
}
@@ -149,17 +143,17 @@ public class ReplicationEvent {
@Override
public String toString() {
return "ReplicationEvent{" +
- "document=" + document +
- ", operationType=" + operationType +
- ", v=" + v +
- ", h=" + h +
- ", timestamp=" + timestamp +
- ", databaseName='" + databaseName + '\'' +
- ", collectionName='" + collectionName + '\'' +
- ", namespace='" + namespace + '\'' +
- ", eventData=" + eventData +
- ", objectId=" + objectId +
- ", replicaSetName='" + replicaSetName + '\'' +
- '}';
+ "document=" + document +
+ ", operationType=" + operationType +
+ ", v=" + v +
+ ", h=" + h +
+ ", timestamp=" + timestamp +
+ ", databaseName='" + databaseName + '\'' +
+ ", collectionName='" + collectionName + '\'' +
+ ", namespace='" + namespace + '\'' +
+ ", eventData=" + eventData +
+ ", objectId=" + objectId +
+ ", replicaSetName='" + replicaSetName + '\'' +
+ '}';
}
}
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
index 60e2514..a804b8b 100644
--- a/src/test/java/org/apache/connect/mongo/FilterTest.java
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -1,6 +1,10 @@
package org.apache.connect.mongo;
import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.connect.mongo.initsync.CollectionMeta;
import org.apache.connect.mongo.replicator.Filter;
import org.apache.connect.mongo.replicator.event.OperationType;
@@ -9,14 +13,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class FilterTest {
-
private SourceTaskConfig sourceTaskConfig;
private Map<String, List<String>> insterest;
@@ -37,7 +35,6 @@ public class FilterTest {
Assert.assertFalse(filter.filterMeta(new CollectionMeta("test", "person01")));
}
-
@Test
public void testBlankDb() {
Filter filter = new Filter(sourceTaskConfig);
@@ -45,7 +42,6 @@ public class FilterTest {
Assert.assertTrue(filter.filterMeta(new CollectionMeta("test1", "test01")));
}
-
@Test
public void testAsterisk() {
List<String> collections = new ArrayList<>();
@@ -57,8 +53,6 @@ public class FilterTest {
Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "tests032")));
}
-
-
@Test
public void testFilterEvent() {
Filter filter = new Filter(sourceTaskConfig);
@@ -69,5 +63,4 @@ public class FilterTest {
Assert.assertTrue(filter.filterEvent(replicationEvent));
}
-
}
diff --git a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
new file mode 100644
index 0000000..93adeeb
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
@@ -0,0 +1,173 @@
+package org.apache.connect.mongo;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoTimeoutException;
+import com.mongodb.client.internal.MongoClientImpl;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.replicator.MongoClientFactory;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MongoFactoryTest {
+
+ private ReplicaSetConfig replicaSetConfig;
+
+ private SourceTaskConfig sourceTaskConfig;
+
+ private MongoClientFactory mongoClientFactory;
+
+ private MongoClientImpl client;
+
+ @Before
+ public void before() {
+ this.replicaSetConfig = new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27027");
+ this.sourceTaskConfig = new SourceTaskConfig();
+ this.mongoClientFactory = new MongoClientFactory(sourceTaskConfig);
+ }
+
+ @After
+ public void after() {
+ client.close();
+ }
+
+ @Test
+ public void testCreateMongoClientWithSSL() {
+ sourceTaskConfig.setSsl("ssl");
+ MongoClientSettings settings = getSettings();
+ System.out.println(settings.getSslSettings());
+ Assert.assertTrue(settings.getSslSettings().isEnabled());
+ }
+
+ @Test
+ public void testCreateMongoClientWithTSL() {
+ sourceTaskConfig.setTsl("tsl");
+ MongoClientSettings settings = getSettings();
+ System.out.println(settings.getSslSettings());
+ Assert.assertTrue(settings.getSslSettings().isEnabled());
+ }
+
+ @Test
+ public void testCreateMongoClientWithserverSelectionTimeoutMS() {
+ try {
+ replicaSetConfig.setReplicaSetName("testReplicatSet");
+ sourceTaskConfig.setServerSelectionTimeoutMS("150");
+ System.out.println(getSettings().getClusterSettings());
+ Assert.assertTrue(getSettings().getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS) == 150);
+ } catch (MongoTimeoutException exception) {
+ Assert.assertTrue(StringUtils.startsWith(exception.getMessage(), "Timed out after 100 ms while waiting for a server that matches"));
+ }
+ }
+
+ @Test
+ public void testCreateMongoClientWithConnectTimeoutMS() {
+ sourceTaskConfig.setConnectTimeoutMS("1200");
+ System.out.println(getSettings().getSocketSettings());
+ Assert.assertTrue(getSettings().getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS) == 1200);
+
+ }
+
+ @Test
+ public void testCreateMongoClientWithSocketTimeoutMS() {
+ sourceTaskConfig.setSocketTimeoutMS("1100");
+ System.out.println(getSettings().getSocketSettings());
+ Assert.assertTrue(getSettings().getSocketSettings().getReadTimeout(TimeUnit.MILLISECONDS) == 1100);
+ }
+
+ @Test
+ public void testCreateMongoClientWithInvalidHostNameAllowed() {
+ sourceTaskConfig.setSslInvalidHostNameAllowed("true");
+ System.out.println(getSettings().getSslSettings());
+ Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
+
+ sourceTaskConfig.setSslInvalidHostNameAllowed("false");
+ System.out.println(getSettings().getSslSettings());
+ Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
+ }
+
+ @Test
+ public void testCreateMongoClientWithInvalidHostNameAllowedTsl() {
+ sourceTaskConfig.setTlsAllowInvalidHostnames("true");
+ System.out.println(getSettings().getSslSettings());
+ Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
+
+ sourceTaskConfig.setTlsAllowInvalidHostnames("false");
+ System.out.println(getSettings().getSslSettings());
+ Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
+ }
+
+ @Test
+ public void testCreateMongoClientWithTlsinsecure() {
+ sourceTaskConfig.setTlsInsecure("true");
+ System.out.println(getSettings().getSslSettings());
+ Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
+
+ sourceTaskConfig.setTlsInsecure("false");
+ System.out.println(getSettings().getSslSettings());
+ Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
+ }
+
+ @Test
+ public void testCreateMongoClientWithCompression() {
+ sourceTaskConfig.setCompressors("zlib");
+ System.out.println(getSettings().getCompressorList());
+ Assert.assertTrue(getSettings().getCompressorList().get(0).getName().equals("zlib"));
+ }
+
+ @Test
+ public void testCreateMongoClientWithCompressionLevel() {
+ sourceTaskConfig.setCompressors("zlib");
+ sourceTaskConfig.setZlibCompressionLevel("7");
+ System.out.println(getSettings().getCompressorList());
+ Assert.assertTrue(getSettings().getCompressorList().get(0).getName().equals("zlib"));
+ Assert.assertTrue(getSettings().getCompressorList().get(0).getProperty("level", 0) == 7);
+ }
+
+ @Test
+ public void testCreateMongoClientWithAuth() {
+ sourceTaskConfig.setMongoUserName("test");
+ sourceTaskConfig.setMongoPassWord("123456");
+ System.out.println(getSettings().getCredential());
+ Assert.assertTrue(getSettings().getCredential().getSource().equals("admin"));
+ Assert.assertTrue(getSettings().getCredential().getUserName().equals("test"));
+ Assert.assertTrue(new String(getSettings().getCredential().getPassword()).equals("123456"));
+ }
+
+ private MongoClientSettings getSettings() {
+ try {
+ client = (MongoClientImpl) mongoClientFactory.createMongoClient(replicaSetConfig);
+ Field field = MongoClientImpl.class.getDeclaredField("settings");
+ field.setAccessible(true);
+ return (MongoClientSettings) field.get(client);
+ } catch (Exception e) {
+
+ }
+ return null;
+ }
+
+// @Test
+// public void testSSLTrustStore() {
+// sourceTaskConfig.setMongoUserName("user_test");
+// sourceTaskConfig.setMongoPassWord("pwd_test");
+// sourceTaskConfig.setSsl("ssl");
+// sourceTaskConfig.setSslInvalidHostNameAllowed("true");
+// sourceTaskConfig.setTrustStore("/Users/liping/test.pem");
+// sourceTaskConfig.setTrustStorePassword("test001");
+// sourceTaskConfig.setServerSelectionTimeoutMS("10000");
+// MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig);
+// MongoCollection<Document> collection = client.getDatabase("test").getCollection("person");
+// Document document = new Document();
+// document.put("name", "liping");
+// collection.insertOne(document);
+// MongoCursor<Document> iterator = collection.find().iterator();
+// while (iterator.hasNext()) {
+// System.out.println(iterator.next());
+// }
+//
+// }
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
index 1f6227b..f85e4a9 100644
--- a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
@@ -5,6 +5,11 @@ import io.openmessaging.connector.api.data.EntryType;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.internal.DefaultKeyValue;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.connect.mongo.connector.MongoSourceConnector;
import org.apache.connect.mongo.connector.MongoSourceTask;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
@@ -17,12 +22,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.LinkedBlockingQueue;
-
public class MongoSourceConnectorTest {
private MongoSourceConnector mongoSourceConnector;
@@ -42,14 +41,12 @@ public class MongoSourceConnectorTest {
Assert.assertEquals(mongoSourceConnector.taskClass(), MongoSourceTask.class);
}
-
@Test
public void verifyConfig() {
String s = mongoSourceConnector.verifyAndSetConfig(keyValue);
Assert.assertTrue(s.contains("Request config key:"));
}
-
@Test
public void testPoll() throws Exception {
LinkedBlockingQueue<SourceDataEntry> entries = new LinkedBlockingQueue<>();
@@ -65,25 +62,22 @@ public class MongoSourceConnectorTest {
event.setH(324243242L);
event.setEventData(Optional.ofNullable(new Document("testEventKey", "testEventValue")));
event.setObjectId(Optional.empty());
- context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27017"));
+ context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27027"));
List<SourceDataEntry> sourceDataEntries = (List<SourceDataEntry>) context.poll();
Assert.assertTrue(sourceDataEntries.size() == 1);
SourceDataEntry sourceDataEntry = sourceDataEntries.get(0);
Assert.assertEquals("test-person", sourceDataEntry.getQueueName());
-
ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
Assert.assertEquals("testReplicaName", new String(sourcePartition.array()));
-
ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
ReplicaSetConfig.Position position = JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class);
Assert.assertEquals(position.getTimeStamp(), 1565609506);
Assert.assertEquals(position.getInc(), 1);
Assert.assertEquals(position.isInitSync(), false);
-
EntryType entryType = sourceDataEntry.getEntryType();
Assert.assertEquals(EntryType.CREATE, entryType);
@@ -97,5 +91,4 @@ public class MongoSourceConnectorTest {
}
-
}
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
new file mode 100644
index 0000000..b696393
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
@@ -0,0 +1,143 @@
+package org.apache.connect.mongo;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.PositionStorageReader;
+import io.openmessaging.connector.api.source.SourceTask;
+import io.openmessaging.connector.api.source.SourceTaskContext;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.connector.MongoSourceTask;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.ReplicaSet;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MongoSourceTaskTest {
+
+ @Test
+ public void testEmptyContextStart() throws NoSuchFieldException, IllegalAccessException {
+ MongoSourceTask mongoSourceTask = new MongoSourceTask();
+ DefaultKeyValue defaultKeyValue = new DefaultKeyValue();
+ defaultKeyValue.put("mongoAddr", "test/127.0.0.1:27027");
+ defaultKeyValue.put("positionTimeStamp", "11111111");
+ defaultKeyValue.put("positionInc", "111");
+ defaultKeyValue.put("serverSelectionTimeoutMS", "10");
+ defaultKeyValue.put("dataSync", Constants.INITSYNC);
+
+ Field context = SourceTask.class.getDeclaredField("context");
+ context.setAccessible(true);
+ context.set(mongoSourceTask, emptyTaskContext());
+ mongoSourceTask.start(defaultKeyValue);
+
+ Field replicaSetsContext = MongoSourceTask.class.getDeclaredField("replicaSetsContext");
+ replicaSetsContext.setAccessible(true);
+ ReplicaSetsContext setsContext = (ReplicaSetsContext) replicaSetsContext.get(mongoSourceTask);
+
+ Field replicaSets = ReplicaSetsContext.class.getDeclaredField("replicaSets");
+ replicaSets.setAccessible(true);
+ List<ReplicaSet> replicaSetList = (List<ReplicaSet>) replicaSets.get(setsContext);
+ Assert.assertTrue(replicaSetList.size() == 1);
+ ReplicaSet replicaSet = replicaSetList.get(0);
+ Field replicaSetConfig = ReplicaSet.class.getDeclaredField("replicaSetConfig");
+ replicaSetConfig.setAccessible(true);
+ ReplicaSetConfig replicaSetConfig1 = (ReplicaSetConfig) replicaSetConfig.get(replicaSet);
+ Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getReplicaSetName(), "test"));
+ Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27027"));
+ Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 11111111);
+ Assert.assertTrue(replicaSetConfig1.getPosition().getInc() == 111);
+ Assert.assertTrue(replicaSetConfig1.getPosition().isInitSync());
+ }
+
+ private SourceTaskContext emptyTaskContext() {
+ return new SourceTaskContext() {
+ @Override
+ public PositionStorageReader positionStorageReader() {
+ return new PositionStorageReader() {
+ @Override
+ public ByteBuffer getPosition(ByteBuffer partition) {
+ return null;
+ }
+
+ @Override
+ public Map<ByteBuffer, ByteBuffer> getPositions(Collection<ByteBuffer> partitions) {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public KeyValue configs() {
+ return null;
+ }
+ };
+ }
+
+ @Test
+ public void testContextStart() throws NoSuchFieldException, IllegalAccessException {
+ MongoSourceTask mongoSourceTask = new MongoSourceTask();
+ DefaultKeyValue defaultKeyValue = new DefaultKeyValue();
+ defaultKeyValue.put("mongoAddr", "test/127.0.0.1:27027");
+ defaultKeyValue.put("serverSelectionTimeoutMS", "10");
+
+ Field context = SourceTask.class.getDeclaredField("context");
+ context.setAccessible(true);
+ context.set(mongoSourceTask, TaskContext());
+ mongoSourceTask.start(defaultKeyValue);
+
+ Field replicaSetsContext = MongoSourceTask.class.getDeclaredField("replicaSetsContext");
+ replicaSetsContext.setAccessible(true);
+ ReplicaSetsContext setsContext = (ReplicaSetsContext) replicaSetsContext.get(mongoSourceTask);
+
+ Field replicaSets = ReplicaSetsContext.class.getDeclaredField("replicaSets");
+ replicaSets.setAccessible(true);
+ List<ReplicaSet> replicaSetList = (List<ReplicaSet>) replicaSets.get(setsContext);
+ Assert.assertTrue(replicaSetList.size() == 1);
+ ReplicaSet replicaSet = replicaSetList.get(0);
+ Field replicaSetConfig = ReplicaSet.class.getDeclaredField("replicaSetConfig");
+ replicaSetConfig.setAccessible(true);
+ ReplicaSetConfig replicaSetConfig1 = (ReplicaSetConfig) replicaSetConfig.get(replicaSet);
+ Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getReplicaSetName(), "test"));
+ Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27027"));
+ Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 22222222);
+ Assert.assertTrue(replicaSetConfig1.getPosition().getInc() == 222);
+ Assert.assertTrue(!replicaSetConfig1.getPosition().isInitSync());
+ }
+
+ private SourceTaskContext TaskContext() {
+ return new SourceTaskContext() {
+ @Override
+ public PositionStorageReader positionStorageReader() {
+ return new PositionStorageReader() {
+ @Override
+ public ByteBuffer getPosition(ByteBuffer partition) {
+
+ Map<String, Object> po = new HashMap<>();
+ po.put("timeStamp", 22222222);
+ po.put("inc", 222);
+ po.put("initSync", false);
+ return ByteBuffer.wrap(JSONObject.toJSONString(po).getBytes());
+ }
+
+ @Override
+ public Map<ByteBuffer, ByteBuffer> getPositions(Collection<ByteBuffer> partitions) {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public KeyValue configs() {
+ return null;
+ }
+ };
+ }
+}
diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java
index 7b0291a..98e9a42 100644
--- a/src/test/java/org/apache/connect/mongo/MongoTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -9,6 +9,14 @@ import com.mongodb.client.MongoCollection;
import io.openmessaging.connector.api.data.EntryType;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SourceDataEntry;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.connect.mongo.initsync.InitSync;
import org.apache.connect.mongo.replicator.Constants;
import org.apache.connect.mongo.replicator.ReplicaSet;
@@ -23,11 +31,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class MongoTest {
private MongoClient mongoClient;
@@ -35,11 +38,10 @@ public class MongoTest {
@Before
public void before() {
MongoClientSettings.Builder builder = MongoClientSettings.builder();
- builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077"));
+ builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27027"));
mongoClient = MongoClients.create(builder.build());
}
-
@Test
public void testConvertEvent() {
Document oplog = new Document();
@@ -60,10 +62,8 @@ public class MongoTest {
Assert.assertEquals(document, event.getEventData().get());
Assert.assertEquals("testR", event.getReplicaSetName());
-
}
-
@Test
public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person");
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
index 8613c42..16cb959 100644
--- a/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
+++ b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
@@ -20,10 +20,9 @@ public class ReplicaContextTest {
context = new ReplicaSetsContext(sourceTaskConfig);
}
-
@Test
public void testCreateMongoClient() {
- MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27017"));
+ MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "", "127.0.0.1:27027"));
MongoIterable<String> collectionNames = mongoClient.getDatabase("local").listCollectionNames();
MongoCursor<String> iterator = collectionNames.iterator();
while (iterator.hasNext()) {
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java
new file mode 100644
index 0000000..07eefae
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java
@@ -0,0 +1,59 @@
+package org.apache.connect.mongo;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.connect.mongo.replicator.ReplicaSet;
+import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.apache.connect.mongo.replicator.ReplicaSetsContext;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicaSetTest {
+
+ private ReplicaSet replicaSet;
+
+ private SourceTaskConfig sourceTaskConfig;
+
+ private ReplicaSetConfig replicaSetConfig;
+
+ private ReplicaSetsContext replicaSetsContext;
+
+ @Before
+ public void before() {
+ this.sourceTaskConfig = new SourceTaskConfig();
+ this.replicaSetConfig = new ReplicaSetConfig("shardName1", "", "127.0.0.1:27027");
+ this.replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
+ this.replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
+ }
+
+ @Test
+ public void testStartAndShutDown() throws NoSuchFieldException, IllegalAccessException {
+ replicaSet.start();
+ Field field = ReplicaSet.class.getDeclaredField("running");
+ field.setAccessible(true);
+ AtomicBoolean o = (AtomicBoolean) field.get(replicaSet);
+ Assert.assertTrue(o.get());
+ replicaSet.shutdown();
+ Assert.assertFalse(o.get());
+ }
+
+ @Test
+ public void testPause() throws Exception {
+ replicaSet.pause();
+ Field field = ReplicaSet.class.getDeclaredField("pause");
+ field.setAccessible(true);
+ boolean pause = (boolean) field.get(replicaSet);
+ Assert.assertTrue(pause);
+ }
+
+ @Test
+ public void testResume() throws Exception {
+ replicaSet.resume();
+ Field field = ReplicaSet.class.getDeclaredField("pause");
+ field.setAccessible(true);
+ boolean pause = (boolean) field.get(replicaSet);
+ Assert.assertFalse(pause);
+ }
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
index e69eac6..5276f4f 100644
--- a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
+++ b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
@@ -1,29 +1,25 @@
package org.apache.connect.mongo;
+import java.util.Map;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.ReplicaSets;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Map;
-
public class ReplicaSetsTest {
-
@Test(expected = IllegalArgumentException.class)
- public void testCreatReplicaSetsException01() {
+ public void testCreatReplicaSetsExceptionWithoutMongoAddr() {
ReplicaSets.create("");
}
-
@Test(expected = IllegalArgumentException.class)
- public void testCreatReplicaSetsException02() {
+ public void testCreatReplicaSetsExceptioWithoutReplicaSetName() {
ReplicaSets.create("127.0.0.1:27081");
}
-
@Test
- public void testCreatReplicaSets01() {
+ public void testCreatReplicaSetsSpecialReplicaSetName() {
ReplicaSets replicaSets = ReplicaSets.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
Assert.assertTrue(replicaSetConfigMap.size() == 1);
@@ -32,9 +28,8 @@ public class ReplicaSetsTest {
Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName());
}
-
@Test
- public void testCreatReplicaSets02() {
+ public void testCreatReplicaSetsSpecialShardNameAndReplicaSetName() {
ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
Assert.assertTrue(replicaSetConfigMap.size() == 1);
@@ -44,9 +39,8 @@ public class ReplicaSetsTest {
Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName());
}
-
@Test
- public void testCreatReplicaSets03() {
+ public void testCreatReplicaSetsMutiMongoAddr() {
ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283");
Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
Assert.assertTrue(replicaSetConfigMap.size() == 2);
@@ -55,7 +49,6 @@ public class ReplicaSetsTest {
Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName());
Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName());
-
Assert.assertNotNull(replicaSetConfigMap.get("replicaName2"));
Assert.assertEquals("127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283", replicaSetConfigMap.get("replicaName2").getHost());
Assert.assertEquals("replicaName2", replicaSetConfigMap.get("replicaName2").getReplicaSetName());