You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:25 UTC
[rocketmq-connect] 15/39: Increase topic change awareness monitoring. resolve #398
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit ebb44c150bc43fe187c28b44e0b1915c2e901dfb
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Sun Sep 22 10:26:09 2019 +0800
Increase topic change awareness monitoring. resolve #398
---
.../rocketmq/replicator/RmqSourceReplicator.java | 48 +++++++++++++++++++++-
1 file changed, 47 insertions(+), 1 deletion(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index e124f15..429102e 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -25,14 +25,18 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.text.StrSubstitutor;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -78,9 +82,12 @@ public class RmqSourceReplicator extends SourceConnector {
private volatile boolean adminStarted;
+ private ScheduledExecutorService executor;
+
public RmqSourceReplicator() {
topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
whiteList = new HashSet<String>();
+ executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqSourceReplicator-SourceWatcher-%d").daemon(true).build());
}
private synchronized void startMQAdminTools() {
@@ -151,6 +158,44 @@ public class RmqSourceReplicator extends SourceConnector {
@Override
public void start() {
startMQAdminTools();
+ startListner();
+ }
+
+ public void startListner() {
+ executor.scheduleAtFixedRate(new Runnable() {
+ @Override public void run() {
+ Map<String, List<TaskTopicInfo>> origin = topicRouteMap;
+ topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
+
+ buildRoute();
+
+ if (!compare(origin, topicRouteMap)) {
+ context.requestTaskReconfiguration();
+ }
+ }
+ }, 30, 30, TimeUnit.SECONDS);
+ }
+
+ public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) {
+ if (origin.size() != updated.size()) {
+ return false;
+ }
+ for (Map.Entry<String, List<TaskTopicInfo>> entry : origin.entrySet()) {
+ if (!updated.containsKey(entry.getKey())) {
+ return false;
+ }
+ List<TaskTopicInfo> originTasks = entry.getValue();
+ List<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+ if (originTasks.size() != updateTasks.size()) {
+ return false;
+ }
+
+ if (!originTasks.containsAll(updateTasks)) {
+ return false;
+ }
+ }
+
+ return true;
}
public void stop() {
@@ -169,6 +214,7 @@ public class RmqSourceReplicator extends SourceConnector {
return RmqSourceTask.class;
}
+ @Override
public List<KeyValue> taskConfigs() {
if (!configValid) {
return new ArrayList<KeyValue>();