You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:29 UTC
[rocketmq-connect] 06/13: fix bug
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 6777f2317f083112cadd652270fd141290122f1b
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Thu Aug 8 19:53:39 2019 +0800
fix bug
---
.../org/apache/connect/mongo/MongoReplicatorConfig.java | 6 +++---
.../apache/connect/mongo/connector/MongoSourceTask.java | 11 ++++++-----
.../apache/connect/mongo/replicator/MongoReplicator.java | 15 ++++++++++-----
.../apache/connect/mongo/replicator/ReplicatorTask.java | 2 +-
src/test/java/org/apache/connect/mongo/MongoTest.java | 10 +++++-----
5 files changed, 25 insertions(+), 19 deletions(-)
diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
index b7044ec..9097640 100644
--- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
+++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
@@ -17,7 +17,7 @@ public class MongoReplicatorConfig {
private String interestDbAndCollection;
private int positionTimeStamp;
private int positionInc;
- private String dataSync;
+ private boolean dataSync;
private int copyThread = Runtime.getRuntime().availableProcessors();
@@ -85,11 +85,11 @@ public class MongoReplicatorConfig {
}
- public String getDataSync() {
+ public boolean getDataSync() {
return dataSync;
}
- public void setDataSync(String dataSync) {
+ public void setDataSync(boolean dataSync) {
this.dataSync = dataSync;
}
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index 9176ab7..7272878 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -4,11 +4,11 @@ import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.*;
import io.openmessaging.connector.api.source.SourceTask;
-import org.apache.connect.mongo.replicator.Constants;
import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.MongoReplicator;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-import org.apache.connect.mongo.replicator.MongoReplicator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +44,7 @@ public class MongoSourceTask extends SourceTask {
buildFieleds(schema);
DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(event.getNamespace().replace(".", "-"))
+ .queue(event.getNamespace().replace(".", "-").replace("$", ""))
.entryType(event.getEntryType());
if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) {
@@ -78,10 +78,11 @@ public class MongoSourceTask extends SourceTask {
if (position != null && position.array().length > 0) {
String positionJson = new String(position.array(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(positionJson);
- replicatorConfig.setPositionTimeStamp(jsonObject.getLongValue("timeStamp"));
+ replicatorConfig.setPositionTimeStamp(jsonObject.getIntValue("timeStamp"));
replicatorConfig.setPositionInc(jsonObject.getIntValue("inc"));
+ replicatorConfig.setDataSync(jsonObject.getBooleanValue(Constants.INITSYNC));
} else {
- replicatorConfig.setDataSync(Constants.INITIAL);
+ replicatorConfig.setDataSync(true);
}
mongoReplicator.start();
}catch (Throwable throwable) {
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
index 0f8520b..60b8d3d 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
@@ -1,16 +1,21 @@
package org.apache.connect.mongo.replicator;
-import com.mongodb.*;
-import com.mongodb.client.*;
+import com.mongodb.ConnectionString;
+import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.Validate;
import org.apache.connect.mongo.MongoReplicatorConfig;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.connect.mongo.replicator.Constants.*;
@@ -81,7 +86,7 @@ public class MongoReplicator {
}
sb.append(mongoReplicatorConfig.getMongoAddr());
sb.append("/");
- if (StringUtils.isBlank(mongoReplicatorConfig.getReplicaSet())) {
+ if (StringUtils.isNotBlank(mongoReplicatorConfig.getReplicaSet())) {
sb.append("?");
sb.append("replicaSet=");
sb.append(mongoReplicatorConfig.getReplicaSet());
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index 32bb25b..766225f 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -37,7 +37,7 @@ public class ReplicatorTask implements Runnable {
@Override
public void run() {
- if (Constants.INITIAL.equals(mongoReplicatorConfig.getDataSync())) {
+ if (mongoReplicatorConfig.getDataSync()) {
InitSync initSync = new InitSync(mongoReplicatorConfig, mongoClient, filter, mongoReplicator);
initSync.start();
}
diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java
index 849c00c..cc83fbe 100644
--- a/src/test/java/org/apache/connect/mongo/MongoTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -2,10 +2,10 @@ package org.apache.connect.mongo;
import com.alibaba.fastjson.JSONObject;
import com.mongodb.ConnectionString;
-import com.mongodb.CursorType;
import com.mongodb.MongoClientSettings;
-import com.mongodb.client.*;
-import com.mongodb.client.model.Filters;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
import io.openmessaging.connector.api.data.EntryType;
import org.apache.connect.mongo.initsync.InitSync;
import org.apache.connect.mongo.replicator.Constants;
@@ -36,7 +36,7 @@ public class MongoTest {
@Before
public void before() {
MongoClientSettings.Builder builder = MongoClientSettings.builder();
- builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077"));
+ builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27018"));
mongoClient = MongoClients.create(builder.build());
}
@@ -68,7 +68,7 @@ public class MongoTest {
public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person");
collection.deleteMany(new Document());
- int count = 100;
+ int count = 1;
List<Document> documents = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Document document = new Document();