You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:37 UTC

[rocketmq-connect] 27/39: [ISSUE #478] TopicList is null exception and frequent requestTaskReconfiguration (#483)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit f55d2132317e94529a9e70fbcd896f1e42c51a33
Author: zhoubo <87...@qq.com>
AuthorDate: Thu Dec 19 09:24:00 2019 +0800

    [ISSUE #478] TopicList is null exception and frequent requestTaskReconfiguration (#483)
    
    * TopicList is null exception and frequent requestTaskReconfiguration
    
    * https://github.com/apache/rocketmq-externals/issues/478
    
    * * Runtime add some log
    * Fix replicator add new topic frequent requestTaskReconfiguration bug
---
 .../rocketmq/replicator/RmqSourceReplicator.java   | 30 ++++++++++++++--------
 .../apache/rocketmq/replicator/RmqSourceTask.java  |  2 +-
 .../strategy/DivideTaskByConsistentHash.java       |  3 ++-
 .../replicator/strategy/DivideTaskByQueue.java     |  4 ++-
 .../replicator/strategy/DivideTaskByTopic.java     |  6 ++---
 .../replicator/strategy/TaskDivideStrategy.java    |  3 ++-
 6 files changed, 30 insertions(+), 18 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index e868b89..82744ed 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -63,7 +63,7 @@ public class RmqSourceReplicator extends SourceConnector {
 
     private RmqConnectorConfig replicatorConfig;
 
-    private Map<String, List<TaskTopicInfo>> topicRouteMap;
+    private Map<String, Set<TaskTopicInfo>> topicRouteMap;
 
     private volatile boolean configValid = false;
 
@@ -75,7 +75,7 @@ public class RmqSourceReplicator extends SourceConnector {
     private ScheduledExecutorService executor;
 
     public RmqSourceReplicator() {
-        topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
+        topicRouteMap = new HashMap<>();
         replicatorConfig = new RmqConnectorConfig();
         executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqSourceReplicator-SourceWatcher-%d").daemon(true).build());
     }
@@ -130,34 +130,42 @@ public class RmqSourceReplicator extends SourceConnector {
     @Override
     public void start() {
         startMQAdminTools();
+        buildRoute();
         startListner();
     }
 
     public void startListner() {
         executor.scheduleAtFixedRate(new Runnable() {
+
+            boolean first = true;
+            Map<String, Set<TaskTopicInfo>> origin = null;
+
+
             @Override public void run() {
-                Map<String, List<TaskTopicInfo>> origin = topicRouteMap;
-                topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
 
                 buildRoute();
-
+                if (first) {
+                    origin = new HashMap<>(topicRouteMap);
+                    first = false;
+                }
                 if (!compare(origin, topicRouteMap)) {
                     context.requestTaskReconfiguration();
+                    origin = new HashMap<>(topicRouteMap);
                 }
             }
         }, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
     }
 
-    public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) {
+    public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<TaskTopicInfo>> updated) {
         if (origin.size() != updated.size()) {
             return false;
         }
-        for (Map.Entry<String, List<TaskTopicInfo>> entry : origin.entrySet()) {
+        for (Map.Entry<String, Set<TaskTopicInfo>> entry : origin.entrySet()) {
             if (!updated.containsKey(entry.getKey())) {
                 return false;
             }
-            List<TaskTopicInfo> originTasks = entry.getValue();
-            List<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+            Set<TaskTopicInfo> originTasks = entry.getValue();
+            Set<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
             if (originTasks.size() != updateTasks.size()) {
                 return false;
             }
@@ -249,7 +257,7 @@ public class RmqSourceReplicator extends SourceConnector {
 
                             TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
                             if (!topicRouteMap.containsKey(topic)) {
-                                topicRouteMap.put(topic, new ArrayList<TaskTopicInfo>());
+                                topicRouteMap.put(topic, new HashSet<>(16));
                             }
                             for (QueueData qd : topicRouteData.getQueueDatas()) {
                                 if (brokerNameSet.contains(qd.getBrokerName())) {
@@ -270,7 +278,7 @@ public class RmqSourceReplicator extends SourceConnector {
         }
     }
 
-    public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
+    public Map<String, Set<TaskTopicInfo>> getTopicRouteMap() {
         return this.topicRouteMap;
     }
 
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index b504e85..3e8d78b 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -121,7 +121,7 @@ public class RmqSourceTask extends SourceTask {
         log.info("RocketMQ source task started");
     }
 
-    public void stop() {
+    @Override public void stop() {
 
         if (started) {
             if (this.consumer != null) {
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
index b027246..1f01be0 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
 import org.apache.rocketmq.common.consistenthash.Node;
 import org.apache.rocketmq.replicator.config.DataType;
@@ -31,7 +32,7 @@ import org.apache.rocketmq.replicator.config.TaskDivideConfig;
 import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public class DivideTaskByConsistentHash extends TaskDivideStrategy {
-    @Override public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
+    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
 
         List<KeyValue> config = new ArrayList<>();
         int parallelism = tdc.getTaskParallelism();
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index 0a493de..e587250 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
 import io.openmessaging.internal.DefaultKeyValue;
 import java.util.HashMap;
+import java.util.Set;
 import org.apache.rocketmq.replicator.config.DataType;
 import org.apache.rocketmq.replicator.config.TaskConfigEnum;
 import org.apache.rocketmq.replicator.config.TaskDivideConfig;
@@ -29,7 +30,8 @@ import java.util.Map;
 import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public class DivideTaskByQueue extends TaskDivideStrategy {
-    public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+
+    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
         int parallelism = tdc.getTaskParallelism();
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
index f8dc8cf..77ea7cc 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.replicator.strategy;
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
 import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.rocketmq.common.message.MessageQueue;
+import java.util.Set;
 import org.apache.rocketmq.replicator.config.*;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -28,13 +28,13 @@ import java.util.Map;
 
 public class DivideTaskByTopic extends TaskDivideStrategy {
 
-    public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+    @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
         int parallelism = tdc.getTaskParallelism();
         int id = -1;
         Map<Integer, List<TaskTopicInfo>> taskTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
-        for (Map.Entry<String, List<TaskTopicInfo>> entry : topicRouteMap.entrySet()) {
+        for (Map.Entry<String, Set<TaskTopicInfo>> entry : topicRouteMap.entrySet()) {
             int ind = ++id % parallelism;
             if (!taskTopicList.containsKey(ind)) {
                 taskTopicList.put(ind, new ArrayList<TaskTopicInfo>());
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
index 6f58bb3..89ed060 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.replicator.strategy;
 
 import io.openmessaging.KeyValue;
+import java.util.Set;
 import org.apache.rocketmq.replicator.config.TaskDivideConfig;
 import java.util.List;
 import java.util.Map;
@@ -24,5 +25,5 @@ import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public abstract class TaskDivideStrategy {
 
-    public abstract List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc);
+    public abstract List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivideConfig tdc);
 }