You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 04:00:49 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Init the remapping command

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 25a588b  Init the remapping command
25a588b is described below

commit 25a588b81fd58dccb1e5f81e8a65859570a76a89
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 12:00:15 2021 +0800

    Init the remapping command
---
 .../rocketmq/common/TopicQueueMappingOne.java      |  54 ++++++
 .../rocketmq/common/TopicQueueMappingUtils.java    |   6 +-
 ...nd.java => RemappingStaticTopicSubCommand.java} | 192 ++++++++++++---------
 .../command/topic/UpdateStaticTopicSubCommand.java |  38 ++--
 4 files changed, 190 insertions(+), 100 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java
new file mode 100644
index 0000000..150a208
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TopicQueueMappingOne extends RemotingSerializable {
+
+    String topic; // redundant field
+    String bname;  //identify the hosted broker name
+    Integer globalId;
+    ImmutableList<LogicQueueMappingItem> items;
+
+    public TopicQueueMappingOne(String topic, String bname, Integer globalId, ImmutableList<LogicQueueMappingItem> items) {
+        this.topic = topic;
+        this.bname = bname;
+        this.globalId = globalId;
+        this.items = items;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public String getBname() {
+        return bname;
+    }
+
+    public Integer getGlobalId() {
+        return globalId;
+    }
+
+    public ImmutableList<LogicQueueMappingItem> getItems() {
+        return items;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
index 686208a..ff89aaf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
@@ -103,7 +103,7 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
     }
 
-    public static Map<Integer, ImmutableList<LogicQueueMappingItem>> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
+    public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
         Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
             @Override
             public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
@@ -111,7 +111,7 @@ public class TopicQueueMappingUtils {
             }
         });
 
-        Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+        Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
         for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
             for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>  entry : mappingDetail.getHostedQueues().entrySet()) {
                 Integer globalid = entry.getKey();
@@ -125,7 +125,7 @@ public class TopicQueueMappingUtils {
                         throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname()));
                     }
                 } else {
-                    globalIdMap.put(globalid, entry.getValue());
+                    globalIdMap.put(globalid, new TopicQueueMappingOne(mappingDetail.topic, mappingDetail.bname, globalid, entry.getValue()));
                 }
             }
         }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
similarity index 52%
copy from tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
copy to tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 29a7261..4cc5acf 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.tools.command.topic;
 
 import com.google.common.collect.ImmutableList;
-import com.sun.xml.internal.ws.api.BindingIDFactory;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
@@ -26,6 +25,7 @@ import org.apache.rocketmq.common.LogicQueueMappingItem;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.TopicQueueMappingOne;
 import org.apache.rocketmq.common.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -38,15 +38,17 @@ import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 
 import java.util.AbstractMap;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-public class UpdateStaticTopicSubCommand implements SubCommand {
+public class RemappingStaticTopicSubCommand implements SubCommand {
 
     @Override
     public String commandName() {
@@ -64,7 +66,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
         Option opt = null;
 
-        opt = new Option("c", "clusterName", true, "create topic to which cluster");
+        opt = new Option("c", "clusters", true, "remapping static topic to clusters, comma separated");
+        optionGroup.addOption(opt);
+
+        opt = new Option("b", "brokers", true, "remapping static topic to brokers, comma separated");
         optionGroup.addOption(opt);
 
         optionGroup.setRequired(true);
@@ -73,11 +78,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
         opt = new Option("t", "topic", true, "topic name");
         opt.setRequired(true);
         options.addOption(opt);
-
-        opt = new Option("qn", "totalQueueNum", true, "total queue num");
-        opt.setRequired(true);
-        options.addOption(opt);
-
         return options;
     }
 
@@ -90,33 +90,46 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
         ClientMetadata clientMetadata = new ClientMetadata();
         Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
-        Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<>();
+        Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
+        Set<String> brokers = new HashSet<>();
+        Map.Entry<Long, Integer> maxEpochAndNum = null;
         try {
-            if (!commandLine.hasOption('t')
-                    || !commandLine.hasOption('c')
-                    || !commandLine.hasOption("qn")) {
+            if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
+                    || !commandLine.hasOption('t')) {
                 ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
                 return;
             }
             String topic = commandLine.getOptionValue('t').trim();
-            int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
-            String clusters = commandLine.getOptionValue('c').trim();
+
             ClusterInfo clusterInfo  = defaultMQAdminExt.examineBrokerClusterInfo();
             if (clusterInfo == null
                     || clusterInfo.getClusterAddrTable().isEmpty()) {
                 throw new RuntimeException("The Cluster info is empty");
-            } else {
-                clientMetadata.refreshClusterInfo(clusterInfo);
             }
-            Set<String> brokers = new HashSet<>();
-            for (String cluster : clusters.split(",")) {
-                cluster = cluster.trim();
-                if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
-                    brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+            clientMetadata.refreshClusterInfo(clusterInfo);
+
+            if (commandLine.hasOption("b")) {
+                String brokerStrs = commandLine.getOptionValue("b").trim();
+                for (String broker: brokerStrs.split(",")) {
+                    brokers.add(broker.trim());
+                }
+            } else if (commandLine.hasOption("c")) {
+                String clusters = commandLine.getOptionValue('c').trim();
+                for (String cluster : clusters.split(",")) {
+                    cluster = cluster.trim();
+                    if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
+                        brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+                    }
                 }
             }
             if (brokers.isEmpty()) {
-                throw new RuntimeException("Find none brokers for " + clusters);
+                throw new RuntimeException("Find none brokers");
+            }
+            for (String broker : brokers) {
+                String addr = clientMetadata.findMasterBrokerAddr(broker);
+                if (addr == null) {
+                    throw new RuntimeException("Can't find addr for broker " + broker);
+                }
             }
 
             //get the existed topic config and mapping
@@ -135,93 +148,106 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 }
             }
 
-            Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
-            if (!existedTopicConfigMap.isEmpty()) {
-                //make sure it it not null
-                existedTopicConfigMap.forEach((key, value) -> {
-                    if (value.getMappingDetail() != null) {
-                        throw new RuntimeException("Mapping info should be null in broker " + key);
-                    }
-                });
-                //make sure the detail is not dirty
-                existedTopicConfigMap.forEach((key, value) -> {
-                    if (!key.equals(value.getMappingDetail().getBname())) {
-                        throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
-                    }
-                    if (value.getMappingDetail().isDirty()) {
-                        throw new RuntimeException("The mapping info is dirty in broker  " + value.getMappingDetail().getBname());
-                    }
-                });
-
-                List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
-                //check the epoch and qnum
-                maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
-                final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
-                detailList.forEach( mappingDetail -> {
-                    if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
-                        throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
-                    }
-                    if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
-                        throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
-                    }
-                });
+            if (existedTopicConfigMap.isEmpty()) {
+                throw new RuntimeException("No topic route to do the remapping");
+            }
 
-                globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
+            //make sure it it not null
+            existedTopicConfigMap.forEach((key, value) -> {
+                if (value.getMappingDetail() != null) {
+                    throw new RuntimeException("Mapping info should be null in broker " + key);
+                }
+            });
+            //make sure the detail is not dirty
+            existedTopicConfigMap.forEach((key, value) -> {
+                if (!key.equals(value.getMappingDetail().getBname())) {
+                    throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
+                }
+                if (value.getMappingDetail().isDirty()) {
+                    throw new RuntimeException("The mapping info is dirty in broker  " + value.getMappingDetail().getBname());
+                }
+            });
 
-                if (maxEpochAndNum.getValue() != globalIdMap.size()) {
-                    throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
+            List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
+            //check the epoch and qnum
+            maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
+            final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
+            detailList.forEach( mappingDetail -> {
+                if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
+                    throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
                 }
-                for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
-                    if (!globalIdMap.containsKey(i)) {
-                        throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
-                    }
+                if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
+                    throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
                 }
+            });
+
+            globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
+
+            if (maxEpochAndNum.getValue() != globalIdMap.size()) {
+                throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
             }
-            if (queueNum < globalIdMap.size()) {
-                throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
-            }
-            //check the queue number
-            if (queueNum == globalIdMap.size()) {
-                throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
+            for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
+                if (!globalIdMap.containsKey(i)) {
+                    throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
+                }
             }
+
             //the check is ok, now do the mapping allocation
-            Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
-            Map<Integer, String> idToBroker = new HashMap<>();
-            globalIdMap.forEach((key, value) -> {
-                String leaderbroker = TopicQueueMappingUtils.getLeaderBroker(value);
-                idToBroker.put(key, leaderbroker);
-                if (!brokerNumMap.containsKey(leaderbroker)) {
-                    brokerNumMap.put(leaderbroker, 1);
+            int maxNum = maxEpochAndNum.getValue();
+            TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
+            allocator.upToNum(maxNum);
+            Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
+            Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
+            Map<Integer, String> expectedIdToBroker = new HashMap<>();
+            //the following logic will make sure that, for one broker, only "take in" or "take out" queues
+            //It can't, take in some queues but alse take out some queues.
+            globalIdMap.forEach((queueId, mappingOne) -> {
+                String leaderBroker = mappingOne.getBname();
+                if (expectedBrokerNumMap.containsKey(leaderBroker)) {
+                    if (expectedBrokerNumMap.get(leaderBroker) > 0) {
+                        expectedIdToBroker.put(queueId, leaderBroker);
+                        expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1);
+                    } else {
+                        waitAssignQueues.add(queueId);
+                        expectedBrokerNumMap.remove(leaderBroker);
+                    }
                 } else {
-                    brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
+                    waitAssignQueues.add(queueId);
                 }
             });
-            TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap);
-            allocator.upToNum(queueNum);
-            Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
+            expectedBrokerNumMap.forEach((broker, queueNum) -> {
+                for (int i = 0; i < queueNum; i++) {
+                    expectedIdToBroker.put(waitAssignQueues.poll(), broker);
+                }
+            });
+
+            Set<Broker>
+
+            //Now construct the remapping info
 
             //construct the topic configAndMapping
             long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
-            for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
+            for (Map.Entry<Integer, String> e : expectedIdToBroker.entrySet()) {
                 Integer queueId = e.getKey();
-                String value = e.getValue();
+                String broker = e.getValue();
                 if (globalIdMap.containsKey(queueId)) {
                     //ignore the exited
                     continue;
                 }
                 TopicConfigAndQueueMapping configMapping;
-                if (!existedTopicConfigMap.containsKey(value)) {
+                if (!existedTopicConfigMap.containsKey(broker)) {
                     TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
-                    TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, value, epoch);
+                    TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker, epoch);
                     configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
+                    existedTopicConfigMap.put(broker, configMapping);
                 } else {
-                    configMapping = existedTopicConfigMap.get(value);
+                    configMapping = existedTopicConfigMap.get(broker);
                     configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
                     configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
                     configMapping.getMappingDetail().setEpoch(epoch);
-                    configMapping.getMappingDetail().setTotalQueues(queueNum);
+                    configMapping.getMappingDetail().setTotalQueues(0);
                 }
-                LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0, -1, -1, -1);
+                LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
                 configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
             }
 
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 29a7261..a1ff0b0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.LogicQueueMappingItem;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.TopicQueueMappingOne;
 import org.apache.rocketmq.common.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -64,7 +65,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
         Option opt = null;
 
-        opt = new Option("c", "clusterName", true, "create topic to which cluster");
+        opt = new Option("c", "clusters", true, "create topic to clusters, comma separated");
         optionGroup.addOption(opt);
 
         optionGroup.setRequired(true);
@@ -90,7 +91,9 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
         ClientMetadata clientMetadata = new ClientMetadata();
         Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
-        Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<>();
+        Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
+        Set<String> brokers = new HashSet<>();
+
         try {
             if (!commandLine.hasOption('t')
                     || !commandLine.hasOption('c')
@@ -108,7 +111,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             } else {
                 clientMetadata.refreshClusterInfo(clusterInfo);
             }
-            Set<String> brokers = new HashSet<>();
             for (String cluster : clusters.split(",")) {
                 cluster = cluster.trim();
                 if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
@@ -118,6 +120,12 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             if (brokers.isEmpty()) {
                 throw new RuntimeException("Find none brokers for " + clusters);
             }
+            for (String broker : brokers) {
+                String addr = clientMetadata.findMasterBrokerAddr(broker);
+                if (addr == null) {
+                    throw new RuntimeException("Can't find addr for broker " + broker);
+                }
+            }
 
             //get the existed topic config and mapping
             TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
@@ -188,7 +196,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
             Map<Integer, String> idToBroker = new HashMap<>();
             globalIdMap.forEach((key, value) -> {
-                String leaderbroker = TopicQueueMappingUtils.getLeaderBroker(value);
+                String leaderbroker = value.getBname();
                 idToBroker.put(key, leaderbroker);
                 if (!brokerNumMap.containsKey(leaderbroker)) {
                     brokerNumMap.put(leaderbroker, 1);
@@ -204,27 +212,29 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
             for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
                 Integer queueId = e.getKey();
-                String value = e.getValue();
+                String broker = e.getValue();
                 if (globalIdMap.containsKey(queueId)) {
                     //ignore the exited
                     continue;
                 }
                 TopicConfigAndQueueMapping configMapping;
-                if (!existedTopicConfigMap.containsKey(value)) {
-                    TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
-                    TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, value, epoch);
-                    configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
+                if (!existedTopicConfigMap.containsKey(broker)) {
+                    configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
+                    configMapping.setWriteQueueNums(1);
+                    configMapping.setReadQueueNums(1);
+                    existedTopicConfigMap.put(broker, configMapping);
                 } else {
-                    configMapping = existedTopicConfigMap.get(value);
+                    configMapping = existedTopicConfigMap.get(broker);
                     configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
                     configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
-                    configMapping.getMappingDetail().setEpoch(epoch);
-                    configMapping.getMappingDetail().setTotalQueues(queueNum);
                 }
-                LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0, -1, -1, -1);
+                LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
                 configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
             }
-
+            existedTopicConfigMap.values().forEach( configMapping -> {
+                configMapping.getMappingDetail().setEpoch(epoch);
+                configMapping.getMappingDetail().setTotalQueues(queueNum);
+            });
             //If some succeed, and others fail, it will cause inconsistent data
             for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
                 String broker = entry.getKey();