You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:47 UTC

[rocketmq-connect] 37/39: [rocketmq-replicator] Fix topic build route logic (#834)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit d1845c57cb47edb563879586a94a27c7feab7096
Author: Git_Yang <30...@users.noreply.github.com>
AuthorDate: Wed Oct 27 14:18:58 2021 +0800

    [rocketmq-replicator] Fix topic build route logic (#834)
    
    Signed-off-by: zhangyang21 <zh...@xiaomi.com>
---
 .../rocketmq/replicator/RmqSourceReplicator.java   | 65 ++++++++++++----------
 1 file changed, 36 insertions(+), 29 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 50f4a17..ecbedb6 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -228,37 +228,44 @@ public class RmqSourceReplicator extends SourceConnector {
 
             TopicList topics = srcMQAdminExt.fetchAllTopicList();
             for (String topic : topics.getTopicList()) {
-                if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
-                    (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
-                    !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
-
-                    for (Pattern pattern : patterns) {
-                        Matcher matcher = pattern.matcher(topic);
-                        if (matcher.matches()) {
-                            String targetTopic = generateTargetTopic(topic);
-                            if (!targetTopicSet.contains(targetTopic)) {
-                                ensureTargetTopic(topic, targetTopic);
-                            }
+                if (topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
+                    continue;
+                }
 
-                            // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
-                            // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
-                            // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
-                            List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
-                            Set<String> brokerNameSet = new HashSet<String>();
-                            for (BrokerData b : brokerList) {
-                                brokerNameSet.add(b.getBrokerName());
-                            }
+                if (!syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                    continue;
+                }
 
-                            TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
-                            if (!topicRouteMap.containsKey(topic)) {
-                                topicRouteMap.put(topic, new HashSet<>(16));
-                            }
-                            for (QueueData qd : topicRouteData.getQueueDatas()) {
-                                if (brokerNameSet.contains(qd.getBrokerName())) {
-                                    for (int i = 0; i < qd.getReadQueueNums(); i++) {
-                                        TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, targetTopic);
-                                        topicRouteMap.get(topic).add(taskTopicInfo);
-                                    }
+                if (!syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+                    continue;
+                }
+
+                for (Pattern pattern : patterns) {
+                    Matcher matcher = pattern.matcher(topic);
+                    if (matcher.matches()) {
+                        String targetTopic = generateTargetTopic(topic);
+                        if (!targetTopicSet.contains(targetTopic)) {
+                            ensureTargetTopic(topic, targetTopic);
+                        }
+
+                        // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+                        // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+                        // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+                        List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+                        Set<String> brokerNameSet = new HashSet<String>();
+                        for (BrokerData b : brokerList) {
+                            brokerNameSet.add(b.getBrokerName());
+                        }
+
+                        TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
+                        if (!topicRouteMap.containsKey(topic)) {
+                            topicRouteMap.put(topic, new HashSet<>(16));
+                        }
+                        for (QueueData qd : topicRouteData.getQueueDatas()) {
+                            if (brokerNameSet.contains(qd.getBrokerName())) {
+                                for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                                    TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, targetTopic);
+                                    topicRouteMap.get(topic).add(taskTopicInfo);
                                 }
                             }
                         }