You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:34 UTC

[rocketmq-connect] 11/13: clean code

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit db51f7ff808a10209ff7eafb68cec0e08f25f058
Author: 李平 <17...@qq.com>
AuthorDate: Mon Aug 26 14:20:51 2019 +0800

    clean code
---
 README.md                                          | 17 +++++++--
 .../apache/connect/mongo/initsync/InitSync.java    |  3 +-
 .../{Filter.java => OperationFilter.java}          |  4 +-
 .../mongo/replicator/ReplicaSetManager.java        |  2 +-
 .../mongo/replicator/ReplicaSetsContext.java       | 10 ++---
 .../connect/mongo/replicator/ReplicatorTask.java   | 25 ++++++------
 .../java/org/apache/connect/mongo/FilterTest.java  | 26 ++++++-------
 .../org/apache/connect/mongo/MongoFactoryTest.java | 44 ++++++++++++----------
 8 files changed, 73 insertions(+), 58 deletions(-)

diff --git a/README.md b/README.md
index e32c455..bec30a3 100644
--- a/README.md
+++ b/README.md
@@ -2,14 +2,18 @@
 
 this is source connector moudle for mongo,you can run this by running rocketmq connecotr api,
 
-some junit rely on mongo database you can start with a docker container
+some junit rely on mongo database you can flow step run a mongo container
 
-`docker run -p27027:27017 --name mongo-test -d  mongo:4.0.10 --replSet "repl1"`
+- `docker run -p27027:27017 --name mongo-test -d  mongo:4.0.10 --replSet "repl1"`
+- `docker exec -it mongo-test mongo `
+- `rs.initiate()` 
 
-and then init a mongo replicaSet
+init a mongo replicaSet run all junit test
 
-`docker exec -it mongo-test mongo ` and `rs.initiate()` and then you can run all junit test
 
+## a special junit
+method `MongoFactoryTest#testSSLTrustStore` is for mongo ssl or tsl test,need mongod config ssl mode, if you want use ssl or tsl you need 
+modify junit , appoint ssl or tsl pem path and password。
 
 
 ## task config params
@@ -32,3 +36,8 @@ and then init a mongo replicaSet
 | zlibCompressionLevel | zlib compressors level| int (1-7)|
 | trustStore | ssl pem| path|
 | trustStorePassword | ssl pem decrypt password | string|
+
+
+## use case
+
+`http://127.0.0.1:8081/connectors/testMongoReplicaSet?config={"connector-class":"org.apache.connect.mongo.connector.MongoSourceConnector","oms-driver-url":"oms:rocketmq://localhost:9876/default:default","mongoAddr":"rep1/127.0.0.1:27077,127.0.0.1:27078,127.0.0.1:27080","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}`
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index 3d68fac..a51b727 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -65,7 +65,8 @@ public class InitSync {
         }
         try {
             countDownLatch.await();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            logger.error("init sync wait countDownLatch interrupted");
         } finally {
             copyExecutor.shutdown();
         }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/OperationFilter.java
similarity index 96%
rename from src/main/java/org/apache/connect/mongo/replicator/Filter.java
rename to src/main/java/org/apache/connect/mongo/replicator/OperationFilter.java
index a517822..a173f6c 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/OperationFilter.java
@@ -29,13 +29,13 @@ import org.apache.connect.mongo.initsync.CollectionMeta;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 
-public class Filter {
+public class OperationFilter {
 
     private Function<CollectionMeta, Boolean> dbAndCollectionFilter;
     private Map<String, List<String>> interestMap = new HashMap<>();
     private Function<OperationType, Boolean> notNoopFilter;
 
-    public Filter(SourceTaskConfig sourceTaskConfig) {
+    public OperationFilter(SourceTaskConfig sourceTaskConfig) {
 
         String interestDbAndCollection = sourceTaskConfig.getInterestDbAndCollection();
 
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
index c5757d8..88097d8 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
@@ -75,7 +75,7 @@ public class ReplicaSetManager {
     }
 
     private void validate() {
-        Validate.isTrue(replicaConfigByName.size() > 0, "task config mongoAdd need special replicaSet addr");
+        Validate.isTrue(replicaConfigByName.size() > 0, "task config mongoAddr need special replicaSet addr");
 
     }
 
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
index b067256..8dd85d7 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
@@ -42,18 +42,16 @@ public class ReplicaSetsContext {
 
     private AtomicBoolean initSyncAbort = new AtomicBoolean();
 
-    private Filter filter;
+    private OperationFilter operationFilter;
 
     private MongoClientFactory mongoClientFactory;
 
-    private Map<String, Position> lastPositionMap;
 
     public ReplicaSetsContext(SourceTaskConfig taskConfig) {
         this.taskConfig = taskConfig;
         this.replicaSets = new ArrayList<>();
-        this.lastPositionMap = new HashMap<>();
         this.dataEntryQueue = new LinkedBlockingDeque<>();
-        this.filter = new Filter(taskConfig);
+        this.operationFilter = new OperationFilter(taskConfig);
         this.mongoClientFactory = new MongoClientFactory(taskConfig);
     }
 
@@ -62,11 +60,11 @@ public class ReplicaSetsContext {
     }
 
     public boolean filterEvent(ReplicationEvent event) {
-        return filter.filterEvent(event);
+        return operationFilter.filterEvent(event);
     }
 
     public boolean filterMeta(CollectionMeta collectionMeta) {
-        return filter.filterMeta(collectionMeta);
+        return operationFilter.filterMeta(collectionMeta);
     }
 
     public int getCopyThread() {
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index 4c142ce..cd78f24 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -54,20 +54,23 @@ public class ReplicatorTask implements Runnable {
     @Override
     public void run() {
 
-        BsonTimestamp firstAvailablePosition = findOplogFirstPosition();
-
-        // inValid or
-        // user config dataSync or
-        // user config or runtime saved position lt first oplog position maybe some operation is lost so need dataSync
-        if (!replicaSetConfig.getPosition().isValid() || replicaSetConfig.getPosition().isInitSync()
-            || replicaSetConfig.getPosition().converBsonTimeStamp().compareTo(firstAvailablePosition) < 0) {
-            recordOplogLastPosition();
+        BsonTimestamp firstAvailablePosition = findFirstOplogPosition();
+
+        Position userConfigOrRuntimePosition = replicaSetConfig.getPosition();
+
+        boolean needDataSync = !userConfigOrRuntimePosition.isValid()
+            || userConfigOrRuntimePosition.isInitSync()
+            // userConfigOrRuntimePosition.position < firstAvailablePosition maybe lost some operations
+            || userConfigOrRuntimePosition.converBsonTimeStamp().compareTo(firstAvailablePosition) < 0;
+
+        if (needDataSync) {
+            recordLastOplogPosition();
             InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet);
             initSync.start();
 
         }
 
-        if (!replicaSet.isRuning() || !replicaSetsContext.isInitSyncAbort()) {
+        if (!replicaSet.isRuning() || replicaSetsContext.isInitSyncAbort()) {
             return;
         }
 
@@ -96,7 +99,7 @@ public class ReplicatorTask implements Runnable {
         logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig);
     }
 
-    private BsonTimestamp findOplogFirstPosition() {
+    private BsonTimestamp findFirstOplogPosition() {
         MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
         FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
         Document lastOplog = iterable.sort(new Document("$natural", 1)).limit(1).first();
@@ -104,7 +107,7 @@ public class ReplicatorTask implements Runnable {
         return timestamp;
     }
 
-    private void recordOplogLastPosition() {
+    private void recordLastOplogPosition() {
         MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
         FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
         Document lastOplog = iterable.sort(new Document("$natural", -1)).limit(1).first();
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
index 31c8f4d..d5deefd 100644
--- a/src/test/java/org/apache/connect/mongo/FilterTest.java
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -6,7 +6,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.connect.mongo.initsync.CollectionMeta;
-import org.apache.connect.mongo.replicator.Filter;
+import org.apache.connect.mongo.replicator.OperationFilter;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 import org.junit.Assert;
@@ -30,16 +30,16 @@ public class FilterTest {
         collections.add("person");
         insterest.put("test", collections);
         sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
-        Filter filter = new Filter(sourceTaskConfig);
-        Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "person")));
-        Assert.assertFalse(filter.filterMeta(new CollectionMeta("test", "person01")));
+        OperationFilter operationFilter = new OperationFilter(sourceTaskConfig);
+        Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "person")));
+        Assert.assertFalse(operationFilter.filterMeta(new CollectionMeta("test", "person01")));
     }
 
     @Test
     public void testBlankDb() {
-        Filter filter = new Filter(sourceTaskConfig);
-        Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "test")));
-        Assert.assertTrue(filter.filterMeta(new CollectionMeta("test1", "test01")));
+        OperationFilter operationFilter = new OperationFilter(sourceTaskConfig);
+        Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "test")));
+        Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test1", "test01")));
     }
 
     @Test
@@ -48,19 +48,19 @@ public class FilterTest {
         collections.add("*");
         insterest.put("test", collections);
         sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
-        Filter filter = new Filter(sourceTaskConfig);
-        Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "testsad")));
-        Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "tests032")));
+        OperationFilter operationFilter = new OperationFilter(sourceTaskConfig);
+        Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "testsad")));
+        Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "tests032")));
     }
 
     @Test
     public void testFilterEvent() {
-        Filter filter = new Filter(sourceTaskConfig);
+        OperationFilter operationFilter = new OperationFilter(sourceTaskConfig);
         ReplicationEvent replicationEvent = new ReplicationEvent();
         replicationEvent.setOperationType(OperationType.NOOP);
-        Assert.assertFalse(filter.filterEvent(replicationEvent));
+        Assert.assertFalse(operationFilter.filterEvent(replicationEvent));
         replicationEvent.setOperationType(OperationType.DB_COMMAND);
-        Assert.assertTrue(filter.filterEvent(replicationEvent));
+        Assert.assertTrue(operationFilter.filterEvent(replicationEvent));
 
     }
 
diff --git a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
index 0f02064..e47d2c4 100644
--- a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
@@ -2,12 +2,16 @@ package org.apache.connect.mongo;
 
 import com.mongodb.MongoClientSettings;
 import com.mongodb.MongoTimeoutException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
 import com.mongodb.client.internal.MongoClientImpl;
 import java.lang.reflect.Field;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.connect.mongo.replicator.MongoClientFactory;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
+import org.bson.Document;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -149,25 +153,25 @@ public class MongoFactoryTest {
         return null;
     }
 
-//    @Test
-//    public void testSSLTrustStore() {
-//        sourceTaskConfig.setMongoUserName("user_test");
-//        sourceTaskConfig.setMongoPassWord("pwd_test");
-//        sourceTaskConfig.setSsl("ssl");
-//        sourceTaskConfig.setSslInvalidHostNameAllowed("true");
-//        sourceTaskConfig.setTrustStore("/Users/liping/test.pem");
-//        sourceTaskConfig.setTrustStorePassword("test001");
-//        sourceTaskConfig.setServerSelectionTimeoutMS("10000");
-//        MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig);
-//        MongoCollection<Document> collection = client.getDatabase("test").getCollection("person");
-//        Document document = new Document();
-//        document.put("name", "liping");
-//        collection.insertOne(document);
-//        MongoCursor<Document> iterator = collection.find().iterator();
-//        while (iterator.hasNext()) {
-//            System.out.println(iterator.next());
-//        }
-//
-//    }
+    @Test
+    public void testSSLTrustStore() {
+        sourceTaskConfig.setMongoUserName("user_test");
+        sourceTaskConfig.setMongoPassWord("pwd_test");
+        sourceTaskConfig.setSsl(true);
+        sourceTaskConfig.setSslInvalidHostNameAllowed(true);
+        sourceTaskConfig.setTrustStore("/Users/home/test.pem");
+        sourceTaskConfig.setTrustStorePassword("test001");
+        sourceTaskConfig.setServerSelectionTimeoutMS(10000);
+        MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig);
+        MongoCollection<Document> collection = client.getDatabase("test").getCollection("person");
+        Document document = new Document();
+        document.put("name", "test");
+        collection.insertOne(document);
+        MongoCursor<Document> iterator = collection.find().iterator();
+        while (iterator.hasNext()) {
+            System.out.println(iterator.next());
+        }
+
+    }
 
 }