You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:47 UTC
[rocketmq-connect] 37/39: [rocketmq-replicator] Fix topic build route logic (#834)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit d1845c57cb47edb563879586a94a27c7feab7096
Author: Git_Yang <30...@users.noreply.github.com>
AuthorDate: Wed Oct 27 14:18:58 2021 +0800
[rocketmq-replicator] Fix topic build route logic (#834)
Signed-off-by: zhangyang21 <zh...@xiaomi.com>
---
.../rocketmq/replicator/RmqSourceReplicator.java | 65 ++++++++++++----------
1 file changed, 36 insertions(+), 29 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 50f4a17..ecbedb6 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -228,37 +228,44 @@ public class RmqSourceReplicator extends SourceConnector {
TopicList topics = srcMQAdminExt.fetchAllTopicList();
for (String topic : topics.getTopicList()) {
- if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
- (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
- !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
-
- for (Pattern pattern : patterns) {
- Matcher matcher = pattern.matcher(topic);
- if (matcher.matches()) {
- String targetTopic = generateTargetTopic(topic);
- if (!targetTopicSet.contains(targetTopic)) {
- ensureTargetTopic(topic, targetTopic);
- }
+ if (topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
+ continue;
+ }
- // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
- // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
- // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
- List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
- Set<String> brokerNameSet = new HashSet<String>();
- for (BrokerData b : brokerList) {
- brokerNameSet.add(b.getBrokerName());
- }
+ if (!syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ continue;
+ }
- TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
- if (!topicRouteMap.containsKey(topic)) {
- topicRouteMap.put(topic, new HashSet<>(16));
- }
- for (QueueData qd : topicRouteData.getQueueDatas()) {
- if (brokerNameSet.contains(qd.getBrokerName())) {
- for (int i = 0; i < qd.getReadQueueNums(); i++) {
- TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, targetTopic);
- topicRouteMap.get(topic).add(taskTopicInfo);
- }
+ if (!syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+ continue;
+ }
+
+ for (Pattern pattern : patterns) {
+ Matcher matcher = pattern.matcher(topic);
+ if (matcher.matches()) {
+ String targetTopic = generateTargetTopic(topic);
+ if (!targetTopicSet.contains(targetTopic)) {
+ ensureTargetTopic(topic, targetTopic);
+ }
+
+ // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+ // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+ // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+ List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+ Set<String> brokerNameSet = new HashSet<String>();
+ for (BrokerData b : brokerList) {
+ brokerNameSet.add(b.getBrokerName());
+ }
+
+ TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
+ if (!topicRouteMap.containsKey(topic)) {
+ topicRouteMap.put(topic, new HashSet<>(16));
+ }
+ for (QueueData qd : topicRouteData.getQueueDatas()) {
+ if (brokerNameSet.contains(qd.getBrokerName())) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, targetTopic);
+ topicRouteMap.get(topic).add(taskTopicInfo);
}
}
}