You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:28 UTC

[rocketmq-connect] 05/13: fix some bug

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 70caed3673b4d5b783447622410ddf3dcf9bdfee
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Thu Aug 8 16:25:52 2019 +0800

    fix some bug
---
 .../org/apache/connect/mongo/MongoReplicatorConfig.java     | 13 ++++++++++---
 .../org/apache/connect/mongo/replicator/ReplicatorTask.java |  4 ++--
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
index 9f17aab..b7044ec 100644
--- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
+++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
@@ -2,6 +2,7 @@ package org.apache.connect.mongo;
 
 import io.openmessaging.KeyValue;
 import org.apache.commons.lang3.StringUtils;
+import org.bson.BsonTimestamp;
 
 import java.lang.reflect.Method;
 import java.util.HashSet;
@@ -14,7 +15,7 @@ public class MongoReplicatorConfig {
     private String mongoUserName;
     private String mongoPassWord;
     private String interestDbAndCollection;
-    private long positionTimeStamp;
+    private int positionTimeStamp;
     private int positionInc;
     private String dataSync;
     private int copyThread = Runtime.getRuntime().availableProcessors();
@@ -42,11 +43,11 @@ public class MongoReplicatorConfig {
         this.copyThread = copyThread;
     }
 
-    public long getPositionTimeStamp() {
+    public int getPositionTimeStamp() {
         return positionTimeStamp;
     }
 
-    public void setPositionTimeStamp(long positionTimeStamp) {
+    public void setPositionTimeStamp(int positionTimeStamp) {
         this.positionTimeStamp = positionTimeStamp;
     }
 
@@ -153,4 +154,10 @@ public class MongoReplicatorConfig {
         }
         return replicaSet + ":" + mongoAddr;
     }
+
+
+    public BsonTimestamp getPosition() {
+        return new BsonTimestamp(positionTimeStamp, positionInc);
+    }
+
 }
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index 9e4c67b..32bb25b 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -7,9 +7,9 @@ import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.Filters;
 import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.initsync.InitSync;
 import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
-import org.apache.connect.mongo.initsync.InitSync;
 import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +46,7 @@ public class ReplicatorTask implements Runnable {
         FindIterable<Document> iterable;
         if (mongoReplicatorConfig.getPositionTimeStamp() > 0 && mongoReplicatorConfig.getPositionTimeStamp() < System.currentTimeMillis()) {
             iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
-                    Filters.gt("ts", mongoReplicatorConfig.getPositionTimeStamp()));
+                    Filters.gt("ts", mongoReplicatorConfig.getPosition()));
         } else {
             iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
         }