You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:29 UTC

[rocketmq-connect] 06/13: fix bug

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 6777f2317f083112cadd652270fd141290122f1b
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Thu Aug 8 19:53:39 2019 +0800

    fix bug
---
 .../org/apache/connect/mongo/MongoReplicatorConfig.java   |  6 +++---
 .../apache/connect/mongo/connector/MongoSourceTask.java   | 11 ++++++-----
 .../apache/connect/mongo/replicator/MongoReplicator.java  | 15 ++++++++++-----
 .../apache/connect/mongo/replicator/ReplicatorTask.java   |  2 +-
 src/test/java/org/apache/connect/mongo/MongoTest.java     | 10 +++++-----
 5 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
index b7044ec..9097640 100644
--- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
+++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
@@ -17,7 +17,7 @@ public class MongoReplicatorConfig {
     private String interestDbAndCollection;
     private int positionTimeStamp;
     private int positionInc;
-    private String dataSync;
+    private boolean dataSync;
     private int copyThread = Runtime.getRuntime().availableProcessors();
 
 
@@ -85,11 +85,11 @@ public class MongoReplicatorConfig {
     }
 
 
-    public String getDataSync() {
+    public boolean getDataSync() {
         return dataSync;
     }
 
-    public void setDataSync(String dataSync) {
+    public void setDataSync(boolean dataSync) {
         this.dataSync = dataSync;
     }
 
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index 9176ab7..7272878 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -4,11 +4,11 @@ import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.data.*;
 import io.openmessaging.connector.api.source.SourceTask;
-import org.apache.connect.mongo.replicator.Constants;
 import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.MongoReplicator;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-import org.apache.connect.mongo.replicator.MongoReplicator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +44,7 @@ public class MongoSourceTask extends SourceTask {
         buildFieleds(schema);
         DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
         dataEntryBuilder.timestamp(System.currentTimeMillis())
-                .queue(event.getNamespace().replace(".", "-"))
+                .queue(event.getNamespace().replace(".", "-").replace("$", ""))
                 .entryType(event.getEntryType());
 
         if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) {
@@ -78,10 +78,11 @@ public class MongoSourceTask extends SourceTask {
             if (position != null && position.array().length > 0) {
                 String positionJson = new String(position.array(), StandardCharsets.UTF_8);
                 JSONObject jsonObject = JSONObject.parseObject(positionJson);
-                replicatorConfig.setPositionTimeStamp(jsonObject.getLongValue("timeStamp"));
+                replicatorConfig.setPositionTimeStamp(jsonObject.getIntValue("timeStamp"));
                 replicatorConfig.setPositionInc(jsonObject.getIntValue("inc"));
+                replicatorConfig.setDataSync(jsonObject.getBooleanValue(Constants.INITSYNC));
             } else {
-                replicatorConfig.setDataSync(Constants.INITIAL);
+                replicatorConfig.setDataSync(true);
             }
             mongoReplicator.start();
         }catch (Throwable throwable) {
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
index 0f8520b..60b8d3d 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
@@ -1,16 +1,21 @@
 package org.apache.connect.mongo.replicator;
 
-import com.mongodb.*;
-import com.mongodb.client.*;
+import com.mongodb.ConnectionString;
+import com.mongodb.MongoClientSettings;
 import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.Validate;
 import org.apache.connect.mongo.MongoReplicatorConfig;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.connect.mongo.replicator.Constants.*;
@@ -81,7 +86,7 @@ public class MongoReplicator {
         }
         sb.append(mongoReplicatorConfig.getMongoAddr());
         sb.append("/");
-        if (StringUtils.isBlank(mongoReplicatorConfig.getReplicaSet())) {
+        if (StringUtils.isNotBlank(mongoReplicatorConfig.getReplicaSet())) {
             sb.append("?");
             sb.append("replicaSet=");
             sb.append(mongoReplicatorConfig.getReplicaSet());
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index 32bb25b..766225f 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -37,7 +37,7 @@ public class ReplicatorTask implements Runnable {
     @Override
     public void run() {
 
-        if (Constants.INITIAL.equals(mongoReplicatorConfig.getDataSync())) {
+        if (mongoReplicatorConfig.getDataSync()) {
             InitSync initSync = new InitSync(mongoReplicatorConfig, mongoClient, filter, mongoReplicator);
             initSync.start();
         }
diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java
index 849c00c..cc83fbe 100644
--- a/src/test/java/org/apache/connect/mongo/MongoTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -2,10 +2,10 @@ package org.apache.connect.mongo;
 
 import com.alibaba.fastjson.JSONObject;
 import com.mongodb.ConnectionString;
-import com.mongodb.CursorType;
 import com.mongodb.MongoClientSettings;
-import com.mongodb.client.*;
-import com.mongodb.client.model.Filters;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
 import io.openmessaging.connector.api.data.EntryType;
 import org.apache.connect.mongo.initsync.InitSync;
 import org.apache.connect.mongo.replicator.Constants;
@@ -36,7 +36,7 @@ public class MongoTest {
     @Before
     public void before() {
         MongoClientSettings.Builder builder = MongoClientSettings.builder();
-        builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077"));
+        builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27018"));
         mongoClient = MongoClients.create(builder.build());
     }
 
@@ -68,7 +68,7 @@ public class MongoTest {
     public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
         MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person");
         collection.deleteMany(new Document());
-        int count = 100;
+        int count = 1;
         List<Document> documents = new ArrayList<>(count);
         for (int i = 0; i < count; i++) {
             Document document = new Document();