You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:37 UTC
[rocketmq-connect] 27/39: [ISSUE #478] TopicList is null exception and frequent requestTaskReconfiguration (#483)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit f55d2132317e94529a9e70fbcd896f1e42c51a33
Author: zhoubo <87...@qq.com>
AuthorDate: Thu Dec 19 09:24:00 2019 +0800
[ISSUE #478] TopicList is null exception and frequent requestTaskReconfiguration (#483)
* TopicList is null exception and frequent requestTaskReconfiguration
* https://github.com/apache/rocketmq-externals/issues/478
* * Runtime add some log
* Fix replicator add new topic frequent requestTaskReconfiguration bug
---
.../rocketmq/replicator/RmqSourceReplicator.java | 30 ++++++++++++++--------
.../apache/rocketmq/replicator/RmqSourceTask.java | 2 +-
.../strategy/DivideTaskByConsistentHash.java | 3 ++-
.../replicator/strategy/DivideTaskByQueue.java | 4 ++-
.../replicator/strategy/DivideTaskByTopic.java | 6 ++---
.../replicator/strategy/TaskDivideStrategy.java | 3 ++-
6 files changed, 30 insertions(+), 18 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index e868b89..82744ed 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -63,7 +63,7 @@ public class RmqSourceReplicator extends SourceConnector {
private RmqConnectorConfig replicatorConfig;
- private Map<String, List<TaskTopicInfo>> topicRouteMap;
+ private Map<String, Set<TaskTopicInfo>> topicRouteMap;
private volatile boolean configValid = false;
@@ -75,7 +75,7 @@ public class RmqSourceReplicator extends SourceConnector {
private ScheduledExecutorService executor;
public RmqSourceReplicator() {
- topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
+ topicRouteMap = new HashMap<>();
replicatorConfig = new RmqConnectorConfig();
executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqSourceReplicator-SourceWatcher-%d").daemon(true).build());
}
@@ -130,34 +130,42 @@ public class RmqSourceReplicator extends SourceConnector {
@Override
public void start() {
startMQAdminTools();
+ buildRoute();
startListner();
}
public void startListner() {
executor.scheduleAtFixedRate(new Runnable() {
+
+ boolean first = true;
+ Map<String, Set<TaskTopicInfo>> origin = null;
+
+
@Override public void run() {
- Map<String, List<TaskTopicInfo>> origin = topicRouteMap;
- topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
buildRoute();
-
+ if (first) {
+ origin = new HashMap<>(topicRouteMap);
+ first = false;
+ }
if (!compare(origin, topicRouteMap)) {
context.requestTaskReconfiguration();
+ origin = new HashMap<>(topicRouteMap);
}
}
}, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
}
- public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) {
+ public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<TaskTopicInfo>> updated) {
if (origin.size() != updated.size()) {
return false;
}
- for (Map.Entry<String, List<TaskTopicInfo>> entry : origin.entrySet()) {
+ for (Map.Entry<String, Set<TaskTopicInfo>> entry : origin.entrySet()) {
if (!updated.containsKey(entry.getKey())) {
return false;
}
- List<TaskTopicInfo> originTasks = entry.getValue();
- List<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+ Set<TaskTopicInfo> originTasks = entry.getValue();
+ Set<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
if (originTasks.size() != updateTasks.size()) {
return false;
}
@@ -249,7 +257,7 @@ public class RmqSourceReplicator extends SourceConnector {
TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
if (!topicRouteMap.containsKey(topic)) {
- topicRouteMap.put(topic, new ArrayList<TaskTopicInfo>());
+ topicRouteMap.put(topic, new HashSet<>(16));
}
for (QueueData qd : topicRouteData.getQueueDatas()) {
if (brokerNameSet.contains(qd.getBrokerName())) {
@@ -270,7 +278,7 @@ public class RmqSourceReplicator extends SourceConnector {
}
}
- public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
+ public Map<String, Set<TaskTopicInfo>> getTopicRouteMap() {
return this.topicRouteMap;
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index b504e85..3e8d78b 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -121,7 +121,7 @@ public class RmqSourceTask extends SourceTask {
log.info("RocketMQ source task started");
}
- public void stop() {
+ @Override public void stop() {
if (started) {
if (this.consumer != null) {
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
index b027246..1f01be0 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
import org.apache.rocketmq.common.consistenthash.Node;
import org.apache.rocketmq.replicator.config.DataType;
@@ -31,7 +32,7 @@ import org.apache.rocketmq.replicator.config.TaskDivideConfig;
import org.apache.rocketmq.replicator.config.TaskTopicInfo;
public class DivideTaskByConsistentHash extends TaskDivideStrategy {
- @Override public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
+ @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<>();
int parallelism = tdc.getTaskParallelism();
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index 0a493de..e587250 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
import io.openmessaging.internal.DefaultKeyValue;
import java.util.HashMap;
+import java.util.Set;
import org.apache.rocketmq.replicator.config.DataType;
import org.apache.rocketmq.replicator.config.TaskConfigEnum;
import org.apache.rocketmq.replicator.config.TaskDivideConfig;
@@ -29,7 +30,8 @@ import java.util.Map;
import org.apache.rocketmq.replicator.config.TaskTopicInfo;
public class DivideTaskByQueue extends TaskDivideStrategy {
- public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+
+ @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<KeyValue>();
int parallelism = tdc.getTaskParallelism();
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
index f8dc8cf..77ea7cc 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.replicator.strategy;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.rocketmq.common.message.MessageQueue;
+import java.util.Set;
import org.apache.rocketmq.replicator.config.*;
import java.util.ArrayList;
import java.util.HashMap;
@@ -28,13 +28,13 @@ import java.util.Map;
public class DivideTaskByTopic extends TaskDivideStrategy {
- public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+ @Override public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<KeyValue>();
int parallelism = tdc.getTaskParallelism();
int id = -1;
Map<Integer, List<TaskTopicInfo>> taskTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
- for (Map.Entry<String, List<TaskTopicInfo>> entry : topicRouteMap.entrySet()) {
+ for (Map.Entry<String, Set<TaskTopicInfo>> entry : topicRouteMap.entrySet()) {
int ind = ++id % parallelism;
if (!taskTopicList.containsKey(ind)) {
taskTopicList.put(ind, new ArrayList<TaskTopicInfo>());
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
index 6f58bb3..89ed060 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.replicator.strategy;
import io.openmessaging.KeyValue;
+import java.util.Set;
import org.apache.rocketmq.replicator.config.TaskDivideConfig;
import java.util.List;
import java.util.Map;
@@ -24,5 +25,5 @@ import org.apache.rocketmq.replicator.config.TaskTopicInfo;
public abstract class TaskDivideStrategy {
- public abstract List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc);
+ public abstract List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivideConfig tdc);
}