You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/23 07:00:23 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the code structure for static topic command

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 2f4bf29  Polish the code structure for static topic command
2f4bf29 is described below

commit 2f4bf29603aec75be48aa4bd7d2570b10ed3bf2c
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 23 15:00:01 2021 +0800

    Polish the code structure for static topic command
---
 .../common/statictopic/TopicQueueMappingUtils.java | 101 +++++++++++-
 .../statictopic/TopicRemappingDetailWrapper.java   |  18 +--
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  11 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |  62 +++++++-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   9 +-
 .../topic/RemappingStaticTopicSubCommand.java      | 173 ++-------------------
 .../command/topic/UpdateStaticTopicSubCommand.java |  92 ++---------
 7 files changed, 203 insertions(+), 263 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index dc88876..e83ed2a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -19,18 +19,19 @@ package org.apache.rocketmq.common.statictopic;
 import com.google.common.collect.ImmutableList;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.rpc.ClientMetadata;
-import org.apache.rocketmq.remoting.exception.RemotingException;
 
 import java.io.File;
 import java.util.AbstractMap;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -311,7 +312,7 @@ public class TopicQueueMappingUtils {
         }
     }
 
-    public Map<String, TopicConfigAndQueueMapping> createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+    public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
         Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
         Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
         if (!brokerConfigMap.isEmpty()) {
@@ -379,7 +380,99 @@ public class TopicQueueMappingUtils {
         TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
         TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
 
-        return brokerConfigMap;
+        return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, brokerConfigMap, new HashSet<String>(), new HashSet<String>());
+    }
+
+
+    public static TopicRemappingDetailWrapper remappingStaticTopic(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, Set<String> targetBrokers) {
+        final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        final Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+        //the check is ok, now do the mapping allocation
+        int maxNum = maxEpochAndNum.getValue();
+
+        Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
+        for (String broker: targetBrokers) {
+            brokerNumMap.put(broker, 0);
+        }
+
+        TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<Integer, String>(), brokerNumMap);
+        allocator.upToNum(maxNum);
+        Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
+        Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
+        Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
+        //the following logic will make sure that, for one broker, either "map in" or "map out"
+        //It can't both,  map in some queues but also map out some queues.
+        for (Map.Entry<Integer, TopicQueueMappingOne> entry : globalIdMap.entrySet()) {
+            Integer queueId = entry.getKey();
+            TopicQueueMappingOne mappingOne = entry.getValue();
+            String leaderBroker = mappingOne.getBname();
+            if (expectedBrokerNumMap.containsKey(leaderBroker)) {
+                if (expectedBrokerNumMap.get(leaderBroker) > 0) {
+                    expectedIdToBroker.put(queueId, leaderBroker);
+                    expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1);
+                } else {
+                    waitAssignQueues.add(queueId);
+                    expectedBrokerNumMap.remove(leaderBroker);
+                }
+            } else {
+                waitAssignQueues.add(queueId);
+            }
+        }
+
+        for (Map.Entry<String, Integer> entry: expectedBrokerNumMap.entrySet()) {
+            String broker = entry.getKey();
+            Integer queueNum = entry.getValue();
+            for (int i = 0; i < queueNum; i++) {
+                Integer queueId = waitAssignQueues.poll();
+                assert queueId != null;
+                expectedIdToBroker.put(queueId, broker);
+            }
+        }
+        long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
+
+        //Now construct the remapping info
+        Set<String> brokersToMapOut = new HashSet<String>();
+        Set<String> brokersToMapIn = new HashSet<String>();
+        for (Map.Entry<Integer, String> mapEntry : expectedIdToBroker.entrySet()) {
+            Integer queueId = mapEntry.getKey();
+            String broker = mapEntry.getValue();
+            TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId);
+            assert topicQueueMappingOne != null;
+            if (topicQueueMappingOne.getBname().equals(broker)) {
+                continue;
+            }
+            //remapping
+            final String mapInBroker = broker;
+            final String mapOutBroker = topicQueueMappingOne.getBname();
+            brokersToMapIn.add(mapInBroker);
+            brokersToMapOut.add(mapOutBroker);
+            TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
+            TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
+
+            mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
+            mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
+
+            List<LogicQueueMappingItem> items = new ArrayList<LogicQueueMappingItem>(topicQueueMappingOne.getItems());
+            LogicQueueMappingItem last = items.get(items.size() - 1);
+            items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
+
+            ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
+            //Use the same object
+            mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
+            mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
+        }
+
+        for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+            TopicConfigAndQueueMapping configMapping = entry.getValue();
+            configMapping.getMappingDetail().setEpoch(newEpoch);
+            configMapping.getMappingDetail().setTotalQueues(maxNum);
+        }
+
+        //double check
+        TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+        TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<TopicQueueMappingDetail>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+
+        return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut);
     }
 
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
index a10dba3..80742dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
@@ -18,7 +18,6 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
     private final String topic;
     private final String type;
     private final long epoch;
-    private Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
 
     private Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
 
@@ -26,12 +25,13 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
 
     private Set<String> brokerToMapOut = new HashSet<String>();
 
-    public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<Integer, String> expectedIdToBroker, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+    public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, Set<String> brokerToMapIn, Set<String> brokerToMapOut) {
         this.topic = topic;
         this.type = type;
         this.epoch = epoch;
-        this.expectedIdToBroker = expectedIdToBroker;
         this.brokerConfigMap = brokerConfigMap;
+        this.brokerToMapIn = brokerToMapIn;
+        this.brokerToMapOut = brokerToMapOut;
     }
 
     public String getTopic() {
@@ -46,10 +46,6 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
         return epoch;
     }
 
-    public Map<Integer, String> getExpectedIdToBroker() {
-        return expectedIdToBroker;
-    }
-
     public Map<String, TopicConfigAndQueueMapping> getBrokerConfigMap() {
         return brokerConfigMap;
     }
@@ -58,15 +54,7 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
         return brokerToMapIn;
     }
 
-    public void setBrokerToMapIn(Set<String> brokerToMapIn) {
-        this.brokerToMapIn = brokerToMapIn;
-    }
-
     public Set<String> getBrokerToMapOut() {
         return brokerToMapOut;
     }
-
-    public void setBrokerToMapOut(Set<String> brokerToMapOut) {
-        this.brokerToMapOut = brokerToMapOut;
-    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 3ae0f8c..1913612 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.tools.admin;
 
 import java.io.UnsupportedEncodingException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -29,7 +28,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.AclConfig;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
@@ -225,8 +223,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
-    public Map<String, TopicConfigAndQueueMapping> getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        return this.defaultMQAdminExtImpl.getTopicConfigMap(clientMetadata, topic);
+    public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return this.defaultMQAdminExtImpl.examineTopicConfigAll(clientMetadata, topic);
     }
 
     @Override
@@ -674,6 +672,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
         this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
     }
 
+    @Override
+    public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.defaultMQAdminExtImpl.remappingStaticTopic(clientMetadata, topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, blockSeqSize, force);
+    }
+
 
     @Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
         LogicalQueueRouteData fromQueueRouteData,
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 31be976..498e835 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1110,8 +1110,68 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
     }
 
+
+    @Override
+    public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        for (String broker : brokerConfigMap.keySet()) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            if (addr == null) {
+                throw new RuntimeException("Can't find addr for broker " + broker);
+            }
+        }
+        // now do the remapping
+        //Step1: let the new leader can be write without the logicOffset
+        for (String broker: brokersToMapIn) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+        //Step2: forbid the write of old leader
+        for (String broker: brokersToMapOut) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+        //Step3: decide the logic offset
+        for (String broker: brokersToMapOut) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicStatsTable statsTable = examineTopicStats(addr, topic);
+            TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
+            for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
+                ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+                Integer globalId = entry.getKey();
+                if (items.size() < 2) {
+                    continue;
+                }
+                LogicQueueMappingItem newLeader = items.get(items.size() - 1);
+                LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
+                if (newLeader.getLogicOffset()  > 0) {
+                    continue;
+                }
+                TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
+                if (topicOffset == null) {
+                    throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
+                }
+                //TODO check the max offset, will it return -1?
+                if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
+                    throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
+                }
+                newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
+                TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
+                //fresh the new leader
+                mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
+            }
+        }
+        //Step4: write to the new leader with logic offset
+        for (String broker: brokersToMapIn) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+    }
+
     @Override
-    public Map<String, TopicConfigAndQueueMapping> getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+    public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         TopicRouteData routeData = examineTopicRouteInfo(topic);
         clientMetadata.freshTopicRoute(topic, routeData);
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index f94b90d..f01ccc3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -21,13 +21,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.AclConfig;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.admin.ConsumeStats;
@@ -56,6 +60,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
 import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -342,9 +347,11 @@ public interface MQAdminExt extends MQAdmin {
     void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, MQBrokerException,
             InterruptedException, MQClientException;
 
-    Map<String, TopicConfigAndQueueMapping> getTopicConfigMap(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException,
+    Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException,
             InterruptedException, MQClientException;
 
+    void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
     void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData,
         LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index aa6d134..019fad5 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 
+import java.util.AbstractMap;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -121,7 +122,13 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
             if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
                 force = true;
             }
-            doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
+            for (String broker : wrapper.getBrokerConfigMap().keySet()) {
+                String addr = clientMetadata.findMasterBrokerAddr(broker);
+                if (addr == null) {
+                    throw new RuntimeException("Can't find addr for broker " + broker);
+                }
+            }
+            defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force);
             return;
         }catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
@@ -131,68 +138,9 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
     }
 
 
-
-    public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
-                            ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
-        // now do the remapping
-        //Step1: let the new leader can be write without the logicOffset
-        for (String broker: brokersToMapIn) {
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
-            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
-        }
-        //Step2: forbid the write of old leader
-        for (String broker: brokersToMapOut) {
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
-            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
-        }
-        //Step3: decide the logic offset
-        for (String broker: brokersToMapOut) {
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
-            TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
-            for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
-                ImmutableList<LogicQueueMappingItem> items = entry.getValue();
-                Integer globalId = entry.getKey();
-                if (items.size() < 2) {
-                    continue;
-                }
-                LogicQueueMappingItem newLeader = items.get(items.size() - 1);
-                LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
-                if (newLeader.getLogicOffset()  > 0) {
-                    continue;
-                }
-                TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
-                if (topicOffset == null) {
-                    throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
-                }
-                //TODO check the max offset, will it return -1?
-                if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
-                    throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
-                }
-                newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), 10000));
-                TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
-                //fresh the new leader
-                mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
-            }
-        }
-        //Step4: write to the new leader with logic offset
-        for (String broker: brokersToMapIn) {
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
-            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
-        }
-    }
-
-
-
-
-
     @Override
     public void execute(final CommandLine commandLine, final Options options,
                         RPCHook rpcHook) throws SubCommandException {
-
         if (!commandLine.hasOption('t')) {
             ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
             return;
@@ -250,120 +198,25 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                 }
             }
 
-            //get the existed topic config and mapping
-            {
-                TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
-                clientMetadata.freshTopicRoute(topic, routeData);
-
-                if (routeData != null
-                        && !routeData.getQueueDatas().isEmpty()) {
-                    for (QueueData queueData: routeData.getQueueDatas()) {
-                        String bname = queueData.getBrokerName();
-                        String addr = clientMetadata.findMasterBrokerAddr(bname);
-                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
-                        //allow the config is null
-                        if (mapping != null) {
-                            brokerConfigMap.put(bname, mapping);
-                        }
-                    }
-                }
-            }
-
+            brokerConfigMap  = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
             if (brokerConfigMap.isEmpty()) {
                 throw new RuntimeException("No topic route to do the remapping");
             }
-
-            final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
-            globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
-            //the check is ok, now do the mapping allocation
-            int maxNum = maxEpochAndNum.getValue();
-            long maxEpoch = maxEpochAndNum.getKey();
-
+            Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
             {
-                TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, maxEpoch, new HashMap<>(), brokerConfigMap);
+                TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
                 String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
                 System.out.println("The old mapping data is written to file " + oldMappingDataFile);
             }
 
-            TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
-            allocator.upToNum(maxNum);
-            Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
-            Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
-            Map<Integer, String> expectedIdToBroker = new HashMap<>();
-            //the following logic will make sure that, for one broker, either "map in" or "map out"
-            //It can't both,  map in some queues but also map out some queues.
-            globalIdMap.forEach((queueId, mappingOne) -> {
-                String leaderBroker = mappingOne.getBname();
-                if (expectedBrokerNumMap.containsKey(leaderBroker)) {
-                    if (expectedBrokerNumMap.get(leaderBroker) > 0) {
-                        expectedIdToBroker.put(queueId, leaderBroker);
-                        expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1);
-                    } else {
-                        waitAssignQueues.add(queueId);
-                        expectedBrokerNumMap.remove(leaderBroker);
-                    }
-                } else {
-                    waitAssignQueues.add(queueId);
-                }
-            });
-            expectedBrokerNumMap.forEach((broker, queueNum) -> {
-                for (int i = 0; i < queueNum; i++) {
-                    Integer queueId = waitAssignQueues.poll();
-                    assert queueId != null;
-                    expectedIdToBroker.put(queueId, broker);
-                }
-            });
-            long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
-
-            //Now construct the remapping info
-            Set<String> brokersToMapOut = new HashSet<>();
-            Set<String> brokersToMapIn = new HashSet<>();
-            for (Map.Entry<Integer, String> mapEntry : expectedIdToBroker.entrySet()) {
-                Integer queueId = mapEntry.getKey();
-                String broker = mapEntry.getValue();
-                TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId);
-                assert topicQueueMappingOne != null;
-                if (topicQueueMappingOne.getBname().equals(broker)) {
-                    continue;
-                }
-                //remapping
-                String mapInBroker = broker;
-                String mapOutBroker = topicQueueMappingOne.getBname();
-                brokersToMapIn.add(mapInBroker);
-                brokersToMapOut.add(mapOutBroker);
-                TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
-                TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
-
-                mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
-                mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
-
-                List<LogicQueueMappingItem> items = new ArrayList<>(topicQueueMappingOne.getItems());
-                LogicQueueMappingItem last = items.get(items.size() - 1);
-                items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
-
-                ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
-                //Use the same object
-                mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
-                mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
-            }
-
-            brokerConfigMap.values().forEach(configMapping -> {
-                configMapping.getMappingDetail().setEpoch(newEpoch);
-                configMapping.getMappingDetail().setTotalQueues(maxNum);
-            });
-            //double check
-            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
-            TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(brokerConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
+            TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
 
             {
-                TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, expectedIdToBroker, brokerConfigMap);
-                newWrapper.setBrokerToMapIn(brokersToMapIn);
-                newWrapper.setBrokerToMapOut(brokersToMapOut);
                 String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
                 System.out.println("The old mapping data is written to file " + newMappingDataFile);
             }
 
-            doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
+            defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false);
 
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index cdb4fc0..5a4fc04 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -120,6 +120,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 throw new RuntimeException("The Cluster info is empty");
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
+
             doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
             return;
         }catch (Exception e) {
@@ -130,6 +131,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
     }
 
     public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
+        //check it before
+        for (String broker : brokerConfigMap.keySet()) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            if (addr == null) {
+                throw new RuntimeException("Can't find addr for broker " + broker);
+            }
+        }
         //If some succeed, and others fail, it will cause inconsistent data
         for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
             String broker = entry.getKey();
@@ -158,7 +166,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
         ClientMetadata clientMetadata = new ClientMetadata();
 
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
-        Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
         Set<String> targetBrokers = new HashSet<>();
 
         try {
@@ -202,101 +209,30 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             }
 
             //get the existed topic config and mapping
+
+            brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
             int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
-            {
-                TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
-                clientMetadata.freshTopicRoute(topic, routeData);
-
-                if (routeData != null
-                        && !routeData.getQueueDatas().isEmpty()) {
-                    for (QueueData queueData: routeData.getQueueDatas()) {
-                        String bname = queueData.getBrokerName();
-                        String addr = clientMetadata.findMasterBrokerAddr(bname);
-                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
-                        //allow the config is null
-                        if (mapping != null) {
-                            brokerConfigMap.put(bname, mapping);
-                        }
-                    }
-                }
-            }
 
             Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
             if (!brokerConfigMap.isEmpty()) {
                 maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
-                globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
-            }
-            if (queueNum < globalIdMap.size()) {
-                throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
-            }
-            //check the queue number
-            if (queueNum == globalIdMap.size()) {
-                throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
             }
 
             {
-                TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap);
+                TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
                 String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
                 System.out.println("The old mapping data is written to file " + oldMappingDataFile);
             }
 
-
-            //the check is ok, now do the mapping allocation
-            Map<String, Integer> brokerNumMap = targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
-            final Map<Integer, String> oldIdToBroker = new HashMap<>();
-            globalIdMap.forEach((key, value) -> {
-                String leaderbroker = value.getBname();
-                oldIdToBroker.put(key, leaderbroker);
-                if (!brokerNumMap.containsKey(leaderbroker)) {
-                    brokerNumMap.put(leaderbroker, 1);
-                } else {
-                    brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
-                }
-            });
-            TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
-            allocator.upToNum(queueNum);
-            Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
-
-            //construct the topic configAndMapping
-            long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
-            for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
-                Integer queueId = e.getKey();
-                String broker = e.getValue();
-                if (globalIdMap.containsKey(queueId)) {
-                    //ignore the exited
-                    continue;
-                }
-                TopicConfigAndQueueMapping configMapping;
-                if (!brokerConfigMap.containsKey(broker)) {
-                    configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
-                    configMapping.setWriteQueueNums(1);
-                    configMapping.setReadQueueNums(1);
-                    brokerConfigMap.put(broker, configMapping);
-                } else {
-                    configMapping = brokerConfigMap.get(broker);
-                    configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
-                    configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
-                }
-                LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
-                configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
-            }
-
-            // set the topic config
-            brokerConfigMap.values().forEach(configMapping -> {
-                configMapping.getMappingDetail().setEpoch(newEpoch);
-                configMapping.getMappingDetail().setTotalQueues(queueNum);
-            });
-            //double check the config
-            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
-            TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+            //calculate the new data
+            TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
 
             {
-                TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap);
                 String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
                 System.out.println("The new mapping data is written to file " + newMappingDataFile);
             }
 
-            doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
+            doUpdate(newWrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
 
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);