You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:25 UTC

[rocketmq-connect] 15/39: Increase topic change awareness monitoring. resolve #398

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit ebb44c150bc43fe187c28b44e0b1915c2e901dfb
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Sun Sep 22 10:26:09 2019 +0800

     Increase topic change awareness monitoring. resolve #398
---
 .../rocketmq/replicator/RmqSourceReplicator.java   | 48 +++++++++++++++++++++-
 1 file changed, 47 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index e124f15..429102e 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -25,14 +25,18 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.commons.lang3.text.StrSubstitutor;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -78,9 +82,12 @@ public class RmqSourceReplicator extends SourceConnector {
 
     private volatile boolean adminStarted;
 
+    private ScheduledExecutorService executor;
+
     public RmqSourceReplicator() {
         topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
         whiteList = new HashSet<String>();
+        executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqSourceReplicator-SourceWatcher-%d").daemon(true).build());
     }
 
     private synchronized void startMQAdminTools() {
@@ -151,6 +158,44 @@ public class RmqSourceReplicator extends SourceConnector {
     @Override
     public void start() {
         startMQAdminTools();
+        startListner();
+    }
+
+    public void startListner() {
+        executor.scheduleAtFixedRate(new Runnable() {
+            @Override public void run() {
+                Map<String, List<TaskTopicInfo>> origin = topicRouteMap;
+                topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
+
+                buildRoute();
+
+                if (!compare(origin, topicRouteMap)) {
+                    context.requestTaskReconfiguration();
+                }
+            }
+        }, 30, 30, TimeUnit.SECONDS);
+    }
+
+    public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) {
+        if (origin.size() != updated.size()) {
+            return false;
+        }
+        for (Map.Entry<String, List<TaskTopicInfo>> entry : origin.entrySet()) {
+            if (!updated.containsKey(entry.getKey())) {
+                return false;
+            }
+            List<TaskTopicInfo> originTasks = entry.getValue();
+            List<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+            if (originTasks.size() != updateTasks.size()) {
+                return false;
+            }
+
+            if (!originTasks.containsAll(updateTasks)) {
+                return false;
+            }
+        }
+
+        return true;
     }
 
     public void stop() {
@@ -169,6 +214,7 @@ public class RmqSourceReplicator extends SourceConnector {
         return RmqSourceTask.class;
     }
 
+    @Override
     public List<KeyValue> taskConfigs() {
         if (!configValid) {
             return new ArrayList<KeyValue>();