You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 02:16:16 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Ignore the existed queueId in static topi creation

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 45946d4  Ignore the existed queueId in static topi creation
45946d4 is described below

commit 45946d475697f0b78e4d3b31105f75bb76e7ee19
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 10:15:44 2021 +0800

    Ignore the existed queueId in static topi creation
---
 .../command/topic/UpdateStaticTopicSubCommand.java    | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 5cee8c8..29a7261 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.tools.command.topic;
 
 import com.google.common.collect.ImmutableList;
+import com.sun.xml.internal.ws.api.BindingIDFactory;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
@@ -201,22 +202,28 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
             //construct the topic configAndMapping
             long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
-            newIdToBroker.forEach( (queueId, broker) -> {
+            for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
+                Integer queueId = e.getKey();
+                String value = e.getValue();
+                if (globalIdMap.containsKey(queueId)) {
+                    //ignore the exited
+                    continue;
+                }
                 TopicConfigAndQueueMapping configMapping;
-                if (!existedTopicConfigMap.containsKey(broker)) {
+                if (!existedTopicConfigMap.containsKey(value)) {
                     TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
-                    TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, broker, epoch);
+                    TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, value, epoch);
                     configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
                 } else {
-                    configMapping = existedTopicConfigMap.get(broker);
+                    configMapping = existedTopicConfigMap.get(value);
                     configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
                     configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
                     configMapping.getMappingDetail().setEpoch(epoch);
                     configMapping.getMappingDetail().setTotalQueues(queueNum);
                 }
-                LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
+                LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0, -1, -1, -1);
                 configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
-            });
+            }
 
             //If some succeed, and others fail, it will cause inconsistent data
             for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {