You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:34 UTC
[rocketmq-connect] 11/13: clean code
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit db51f7ff808a10209ff7eafb68cec0e08f25f058
Author: 李平 <17...@qq.com>
AuthorDate: Mon Aug 26 14:20:51 2019 +0800
clean code
---
README.md | 17 +++++++--
.../apache/connect/mongo/initsync/InitSync.java | 3 +-
.../{Filter.java => OperationFilter.java} | 4 +-
.../mongo/replicator/ReplicaSetManager.java | 2 +-
.../mongo/replicator/ReplicaSetsContext.java | 10 ++---
.../connect/mongo/replicator/ReplicatorTask.java | 25 ++++++------
.../java/org/apache/connect/mongo/FilterTest.java | 26 ++++++-------
.../org/apache/connect/mongo/MongoFactoryTest.java | 44 ++++++++++++----------
8 files changed, 73 insertions(+), 58 deletions(-)
diff --git a/README.md b/README.md
index e32c455..bec30a3 100644
--- a/README.md
+++ b/README.md
@@ -2,14 +2,18 @@
this is source connector moudle for mongo,you can run this by running rocketmq connecotr api,
-some junit rely on mongo database you can start with a docker container
+some junit rely on mongo database you can flow step run a mongo container
-`docker run -p27027:27017 --name mongo-test -d mongo:4.0.10 --replSet "repl1"`
+- `docker run -p27027:27017 --name mongo-test -d mongo:4.0.10 --replSet "repl1"`
+- `docker exec -it mongo-test mongo `
+- `rs.initiate()`
-and then init a mongo replicaSet
+init a mongo replicaSet run all junit test
-`docker exec -it mongo-test mongo ` and `rs.initiate()` and then you can run all junit test
+## a special junit
+method `MongoFactoryTest#testSSLTrustStore` is for mongo ssl or tsl test,need mongod config ssl mode, if you want use ssl or tsl you need
+modify junit , appoint ssl or tsl pem path and password。
## task config params
@@ -32,3 +36,8 @@ and then init a mongo replicaSet
| zlibCompressionLevel | zlib compressors level| int (1-7)|
| trustStore | ssl pem| path|
| trustStorePassword | ssl pem decrypt password | string|
+
+
+## use case
+
+`http://127.0.0.1:8081/connectors/testMongoReplicaSet?config={"connector-class":"org.apache.connect.mongo.connector.MongoSourceConnector","oms-driver-url":"oms:rocketmq://localhost:9876/default:default","mongoAddr":"rep1/127.0.0.1:27077,127.0.0.1:27078,127.0.0.1:27080","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}`
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index 3d68fac..a51b727 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -65,7 +65,8 @@ public class InitSync {
}
try {
countDownLatch.await();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ logger.error("init sync wait countDownLatch interrupted");
} finally {
copyExecutor.shutdown();
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/OperationFilter.java
similarity index 96%
rename from src/main/java/org/apache/connect/mongo/replicator/Filter.java
rename to src/main/java/org/apache/connect/mongo/replicator/OperationFilter.java
index a517822..a173f6c 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/OperationFilter.java
@@ -29,13 +29,13 @@ import org.apache.connect.mongo.initsync.CollectionMeta;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-public class Filter {
+public class OperationFilter {
private Function<CollectionMeta, Boolean> dbAndCollectionFilter;
private Map<String, List<String>> interestMap = new HashMap<>();
private Function<OperationType, Boolean> notNoopFilter;
- public Filter(SourceTaskConfig sourceTaskConfig) {
+ public OperationFilter(SourceTaskConfig sourceTaskConfig) {
String interestDbAndCollection = sourceTaskConfig.getInterestDbAndCollection();
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
index c5757d8..88097d8 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
@@ -75,7 +75,7 @@ public class ReplicaSetManager {
}
private void validate() {
- Validate.isTrue(replicaConfigByName.size() > 0, "task config mongoAdd need special replicaSet addr");
+ Validate.isTrue(replicaConfigByName.size() > 0, "task config mongoAddr need special replicaSet addr");
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
index b067256..8dd85d7 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
@@ -42,18 +42,16 @@ public class ReplicaSetsContext {
private AtomicBoolean initSyncAbort = new AtomicBoolean();
- private Filter filter;
+ private OperationFilter operationFilter;
private MongoClientFactory mongoClientFactory;
- private Map<String, Position> lastPositionMap;
public ReplicaSetsContext(SourceTaskConfig taskConfig) {
this.taskConfig = taskConfig;
this.replicaSets = new ArrayList<>();
- this.lastPositionMap = new HashMap<>();
this.dataEntryQueue = new LinkedBlockingDeque<>();
- this.filter = new Filter(taskConfig);
+ this.operationFilter = new OperationFilter(taskConfig);
this.mongoClientFactory = new MongoClientFactory(taskConfig);
}
@@ -62,11 +60,11 @@ public class ReplicaSetsContext {
}
public boolean filterEvent(ReplicationEvent event) {
- return filter.filterEvent(event);
+ return operationFilter.filterEvent(event);
}
public boolean filterMeta(CollectionMeta collectionMeta) {
- return filter.filterMeta(collectionMeta);
+ return operationFilter.filterMeta(collectionMeta);
}
public int getCopyThread() {
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index 4c142ce..cd78f24 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -54,20 +54,23 @@ public class ReplicatorTask implements Runnable {
@Override
public void run() {
- BsonTimestamp firstAvailablePosition = findOplogFirstPosition();
-
- // inValid or
- // user config dataSync or
- // user config or runtime saved position lt first oplog position maybe some operation is lost so need dataSync
- if (!replicaSetConfig.getPosition().isValid() || replicaSetConfig.getPosition().isInitSync()
- || replicaSetConfig.getPosition().converBsonTimeStamp().compareTo(firstAvailablePosition) < 0) {
- recordOplogLastPosition();
+ BsonTimestamp firstAvailablePosition = findFirstOplogPosition();
+
+ Position userConfigOrRuntimePosition = replicaSetConfig.getPosition();
+
+ boolean needDataSync = !userConfigOrRuntimePosition.isValid()
+ || userConfigOrRuntimePosition.isInitSync()
+ // userConfigOrRuntimePosition.position < firstAvailablePosition maybe lost some operations
+ || userConfigOrRuntimePosition.converBsonTimeStamp().compareTo(firstAvailablePosition) < 0;
+
+ if (needDataSync) {
+ recordLastOplogPosition();
InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet);
initSync.start();
}
- if (!replicaSet.isRuning() || !replicaSetsContext.isInitSyncAbort()) {
+ if (!replicaSet.isRuning() || replicaSetsContext.isInitSyncAbort()) {
return;
}
@@ -96,7 +99,7 @@ public class ReplicatorTask implements Runnable {
logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig);
}
- private BsonTimestamp findOplogFirstPosition() {
+ private BsonTimestamp findFirstOplogPosition() {
MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
Document lastOplog = iterable.sort(new Document("$natural", 1)).limit(1).first();
@@ -104,7 +107,7 @@ public class ReplicatorTask implements Runnable {
return timestamp;
}
- private void recordOplogLastPosition() {
+ private void recordLastOplogPosition() {
MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
Document lastOplog = iterable.sort(new Document("$natural", -1)).limit(1).first();
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
index 31c8f4d..d5deefd 100644
--- a/src/test/java/org/apache/connect/mongo/FilterTest.java
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -6,7 +6,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.connect.mongo.initsync.CollectionMeta;
-import org.apache.connect.mongo.replicator.Filter;
+import org.apache.connect.mongo.replicator.OperationFilter;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
import org.junit.Assert;
@@ -30,16 +30,16 @@ public class FilterTest {
collections.add("person");
insterest.put("test", collections);
sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
- Filter filter = new Filter(sourceTaskConfig);
- Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "person")));
- Assert.assertFalse(filter.filterMeta(new CollectionMeta("test", "person01")));
+ OperationFilter operationFilter = new OperationFilter(sourceTaskConfig);
+ Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "person")));
+ Assert.assertFalse(operationFilter.filterMeta(new CollectionMeta("test", "person01")));
}
@Test
public void testBlankDb() {
- Filter filter = new Filter(sourceTaskConfig);
- Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "test")));
- Assert.assertTrue(filter.filterMeta(new CollectionMeta("test1", "test01")));
+ OperationFilter operationFilter = new OperationFilter(sourceTaskConfig);
+ Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "test")));
+ Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test1", "test01")));
}
@Test
@@ -48,19 +48,19 @@ public class FilterTest {
collections.add("*");
insterest.put("test", collections);
sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
- Filter filter = new Filter(sourceTaskConfig);
- Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "testsad")));
- Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "tests032")));
+ OperationFilter operationFilter = new OperationFilter(sourceTaskConfig);
+ Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "testsad")));
+ Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "tests032")));
}
@Test
public void testFilterEvent() {
- Filter filter = new Filter(sourceTaskConfig);
+ OperationFilter operationFilter = new OperationFilter(sourceTaskConfig);
ReplicationEvent replicationEvent = new ReplicationEvent();
replicationEvent.setOperationType(OperationType.NOOP);
- Assert.assertFalse(filter.filterEvent(replicationEvent));
+ Assert.assertFalse(operationFilter.filterEvent(replicationEvent));
replicationEvent.setOperationType(OperationType.DB_COMMAND);
- Assert.assertTrue(filter.filterEvent(replicationEvent));
+ Assert.assertTrue(operationFilter.filterEvent(replicationEvent));
}
diff --git a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
index 0f02064..e47d2c4 100644
--- a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
@@ -2,12 +2,16 @@ package org.apache.connect.mongo;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoTimeoutException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
import com.mongodb.client.internal.MongoClientImpl;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.connect.mongo.replicator.MongoClientFactory;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.bson.Document;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -149,25 +153,25 @@ public class MongoFactoryTest {
return null;
}
-// @Test
-// public void testSSLTrustStore() {
-// sourceTaskConfig.setMongoUserName("user_test");
-// sourceTaskConfig.setMongoPassWord("pwd_test");
-// sourceTaskConfig.setSsl("ssl");
-// sourceTaskConfig.setSslInvalidHostNameAllowed("true");
-// sourceTaskConfig.setTrustStore("/Users/liping/test.pem");
-// sourceTaskConfig.setTrustStorePassword("test001");
-// sourceTaskConfig.setServerSelectionTimeoutMS("10000");
-// MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig);
-// MongoCollection<Document> collection = client.getDatabase("test").getCollection("person");
-// Document document = new Document();
-// document.put("name", "liping");
-// collection.insertOne(document);
-// MongoCursor<Document> iterator = collection.find().iterator();
-// while (iterator.hasNext()) {
-// System.out.println(iterator.next());
-// }
-//
-// }
+ @Test
+ public void testSSLTrustStore() {
+ sourceTaskConfig.setMongoUserName("user_test");
+ sourceTaskConfig.setMongoPassWord("pwd_test");
+ sourceTaskConfig.setSsl(true);
+ sourceTaskConfig.setSslInvalidHostNameAllowed(true);
+ sourceTaskConfig.setTrustStore("/Users/home/test.pem");
+ sourceTaskConfig.setTrustStorePassword("test001");
+ sourceTaskConfig.setServerSelectionTimeoutMS(10000);
+ MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig);
+ MongoCollection<Document> collection = client.getDatabase("test").getCollection("person");
+ Document document = new Document();
+ document.put("name", "test");
+ collection.insertOne(document);
+ MongoCursor<Document> iterator = collection.find().iterator();
+ while (iterator.hasNext()) {
+ System.out.println(iterator.next());
+ }
+
+ }
}