You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 09:34:10 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the code for command

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new f225fd0  Polish the code for command
f225fd0 is described below

commit f225fd05de44f5e46dc5da34a27fe0b1e947f0ef
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 17:31:41 2021 +0800

    Polish the code for command
---
 .../common/statictopic/TopicQueueMappingUtils.java |  17 +-
 .../statictopic/TopicRemappingDetailWrapper.java   |  25 ++
 .../topic/RemappingStaticTopicSubCommand.java      | 299 ++++++++++++---------
 .../command/topic/UpdateStaticTopicSubCommand.java |  26 +-
 4 files changed, 225 insertions(+), 142 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 9784199..545b7cf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -116,13 +116,12 @@ public class TopicQueueMappingUtils {
         return detailList;
     }
 
-    public static Map.Entry<Long, Integer> validConsistenceOfTopicConfigAndQueueMapping(Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+    public static Map.Entry<Long, Integer> checkConsistenceOfTopicConfigAndQueueMapping(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
         if (brokerConfigMap == null
             || brokerConfigMap.isEmpty()) {
             return null;
         }
         //make sure it it not null
-        String topic = null;
         long maxEpoch = -1;
         int maxNum = -1;
         for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
@@ -143,9 +142,7 @@ public class TopicQueueMappingUtils {
             }
             if (topic != null
                 && !topic.equals(mappingDetail.getTopic())) {
-                throw new RuntimeException("The topic name is inconsistent in broker  " + broker);
-            } else {
-                topic = mappingDetail.getTopic();
+                throw new RuntimeException("The topic name is not match for broker  " + broker);
             }
 
             if (maxEpoch != -1
@@ -165,7 +162,7 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
     }
 
-    public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
+    public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
         Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
             @Override
             public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
@@ -216,10 +213,14 @@ public class TopicQueueMappingUtils {
         return items.get(items.size() - 1);
     }
 
-    public static String writeToTemp(TopicRemappingDetailWrapper wrapper, String suffix) {
+    public static String writeToTemp(TopicRemappingDetailWrapper wrapper, boolean after) {
         String topic = wrapper.getTopic();
         String data = wrapper.toJson();
-        String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + "-" + suffix;
+        String suffix = TopicRemappingDetailWrapper.SUFFIX_BEFORE;
+        if (after) {
+            suffix = TopicRemappingDetailWrapper.SUFFIX_AFTER;
+        }
+        String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + suffix;
         try {
             MixAll.string2File(data, fileName);
             return fileName;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
index d229203..a10dba3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
@@ -3,12 +3,17 @@ package org.apache.rocketmq.common.statictopic;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 public class TopicRemappingDetailWrapper extends RemotingSerializable {
     public static final String TYPE_CREATE_OR_UPDATE = "CREATE_OR_UPDATE";
     public static final String TYPE_REMAPPING = "REMAPPING";
 
+    public static final String SUFFIX_BEFORE = ".before";
+    public static final String SUFFIX_AFTER = ".after";
+
 
     private final String topic;
     private final String type;
@@ -17,6 +22,10 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
 
     private Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
 
+    private Set<String> brokerToMapIn = new HashSet<String>();
+
+    private Set<String> brokerToMapOut = new HashSet<String>();
+
     public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<Integer, String> expectedIdToBroker, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
         this.topic = topic;
         this.type = type;
@@ -44,4 +53,20 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable {
     public Map<String, TopicConfigAndQueueMapping> getBrokerConfigMap() {
         return brokerConfigMap;
     }
+
+    public Set<String> getBrokerToMapIn() {
+        return brokerToMapIn;
+    }
+
+    public void setBrokerToMapIn(Set<String> brokerToMapIn) {
+        this.brokerToMapIn = brokerToMapIn;
+    }
+
+    public Set<String> getBrokerToMapOut() {
+        return brokerToMapOut;
+    }
+
+    public void setBrokerToMapOut(Set<String> brokerToMapOut) {
+        this.brokerToMapOut = brokerToMapOut;
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 472ce0b..656396c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -21,9 +21,9 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.admin.TopicOffset;
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -79,24 +80,129 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
         opt = new Option("t", "topic", true, "topic name");
         opt.setRequired(true);
         options.addOption(opt);
+
+        opt = new Option("f", "mapFile", true, "The map file name");
+        opt.setRequired(false);
+        options.addOption(opt);
         return options;
     }
 
 
+    public void executeFromFile(final CommandLine commandLine, final Options options,
+                                RPCHook rpcHook) throws SubCommandException {
+
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        ClientMetadata clientMetadata = new ClientMetadata();
+
+        try {
+            String topic = commandLine.getOptionValue('t').trim();
+
+            String mapFileName = commandLine.getOptionValue('f').trim();
+            String mapData = MixAll.file2String(mapFileName);
+            TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
+            //double check the config
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
+            TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
+
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            if (clusterInfo == null
+                    || clusterInfo.getClusterAddrTable().isEmpty()) {
+                throw new RuntimeException("The Cluster info is empty");
+            }
+            clientMetadata.refreshClusterInfo(clusterInfo);
+            doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
+            return;
+        }catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+
+
+    public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
+                            ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+        // now do the remapping
+        //Step1: let the new leader can be write without the logicOffset
+        for (String broker: brokersToMapIn) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+        }
+        //Step2: forbid the write of old leader
+        for (String broker: brokersToMapOut) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+        }
+        //Step3: decide the logic offset
+        for (String broker: brokersToMapOut) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
+            TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
+            for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
+                ImmutableList<LogicQueueMappingItem> items = entry.getValue();
+                Integer globalId = entry.getKey();
+                if (items.size() < 2) {
+                    continue;
+                }
+                LogicQueueMappingItem newLeader = items.get(items.size() - 1);
+                LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
+                if (newLeader.getLogicOffset()  > 0) {
+                    continue;
+                }
+                TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
+                if (topicOffset == null) {
+                    throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
+                }
+                //TODO check the max offset, will it return -1?
+                if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
+                    throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
+                }
+                newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
+                TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
+                //fresh the new leader
+                mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
+            }
+        }
+        //Step4: write to the new leader with logic offset
+        for (String broker: brokersToMapIn) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+        }
+    }
+
+
+
+
+
     @Override
     public void execute(final CommandLine commandLine, final Options options,
-        RPCHook rpcHook) throws SubCommandException {
+                        RPCHook rpcHook) throws SubCommandException {
+
+        if (!commandLine.hasOption('t')) {
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+            return;
+        }
+
+        if (commandLine.hasOption("f")) {
+            executeFromFile(commandLine, options, rpcHook);
+            return;
+        }
+
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
         defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
 
         ClientMetadata clientMetadata = new ClientMetadata();
-        Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
         Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
-        Set<String> brokers = new HashSet<>();
-        Map.Entry<Long, Integer> maxEpochAndNum = null;
+        Set<String> targetBrokers = new HashSet<>();
+
         try {
-            if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
-                    || !commandLine.hasOption('t')) {
+            if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) {
                 ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
                 return;
             }
@@ -108,84 +214,67 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                 throw new RuntimeException("The Cluster info is empty");
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
-
-            if (commandLine.hasOption("b")) {
-                String brokerStrs = commandLine.getOptionValue("b").trim();
-                for (String broker: brokerStrs.split(",")) {
-                    brokers.add(broker.trim());
-                }
-            } else if (commandLine.hasOption("c")) {
-                String clusters = commandLine.getOptionValue('c').trim();
-                for (String cluster : clusters.split(",")) {
-                    cluster = cluster.trim();
-                    if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
-                        brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+            {
+                if (commandLine.hasOption("b")) {
+                    String brokerStrs = commandLine.getOptionValue("b").trim();
+                    for (String broker: brokerStrs.split(",")) {
+                        targetBrokers.add(broker.trim());
+                    }
+                } else if (commandLine.hasOption("c")) {
+                    String clusters = commandLine.getOptionValue('c').trim();
+                    for (String cluster : clusters.split(",")) {
+                        cluster = cluster.trim();
+                        if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
+                            targetBrokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+                        }
                     }
                 }
-            }
-            if (brokers.isEmpty()) {
-                throw new RuntimeException("Find none brokers");
-            }
-            for (String broker : brokers) {
-                String addr = clientMetadata.findMasterBrokerAddr(broker);
-                if (addr == null) {
-                    throw new RuntimeException("Can't find addr for broker " + broker);
+                if (targetBrokers.isEmpty()) {
+                    throw new RuntimeException("Find none brokers, do nothing");
+                }
+                for (String broker : targetBrokers) {
+                    String addr = clientMetadata.findMasterBrokerAddr(broker);
+                    if (addr == null) {
+                        throw new RuntimeException("Can't find addr for broker " + broker);
+                    }
                 }
             }
 
             //get the existed topic config and mapping
-            TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
-            clientMetadata.freshTopicRoute(topic, routeData);
-            if (routeData != null
-                    && !routeData.getQueueDatas().isEmpty()) {
-                for (QueueData queueData: routeData.getQueueDatas()) {
-                    String bname = queueData.getBrokerName();
-                    String addr = clientMetadata.findMasterBrokerAddr(bname);
-                    TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
-                    //allow the config is null
-                    if (mapping != null) {
-                        existedTopicConfigMap.put(bname, mapping);
+            {
+                TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+                clientMetadata.freshTopicRoute(topic, routeData);
+                if (routeData != null
+                        && !routeData.getQueueDatas().isEmpty()) {
+                    for (QueueData queueData: routeData.getQueueDatas()) {
+                        String bname = queueData.getBrokerName();
+                        String addr = clientMetadata.findMasterBrokerAddr(bname);
+                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+                        //allow the config is null
+                        if (mapping != null) {
+                            brokerConfigMap.put(bname, mapping);
+                        }
                     }
                 }
             }
 
-            if (existedTopicConfigMap.isEmpty()) {
+            if (brokerConfigMap.isEmpty()) {
                 throw new RuntimeException("No topic route to do the remapping");
             }
 
-            //make sure it it not null
-            existedTopicConfigMap.forEach((key, value) -> {
-                if (value.getMappingDetail() != null) {
-                    throw new RuntimeException("Mapping info should be null in broker " + key);
-                }
-            });
-            //make sure the detail is not dirty
-            existedTopicConfigMap.forEach((key, value) -> {
-                if (!key.equals(value.getMappingDetail().getBname())) {
-                    throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
-                }
-                if (value.getMappingDetail().isDirty()) {
-                    throw new RuntimeException("The mapping info is dirty in broker  " + value.getMappingDetail().getBname());
-                }
-            });
-
-            List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
-            //check the epoch and qnum
-            maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
-            for (TopicQueueMappingDetail mappingDetail : detailList) {
-                if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
-                    throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
-                }
-                if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
-                    throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
-                }
-            }
-            globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true);
-
+            final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+            globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
             //the check is ok, now do the mapping allocation
             int maxNum = maxEpochAndNum.getValue();
             long maxEpoch = maxEpochAndNum.getKey();
-            TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
+
+            {
+                TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, maxEpoch, new HashMap<>(), brokerConfigMap);
+                String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
+                System.out.println("The old mapping data is written to file " + oldMappingDataFile);
+            }
+
+            TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
             allocator.upToNum(maxNum);
             Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
             Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
@@ -213,6 +302,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                     expectedIdToBroker.put(queueId, broker);
                 }
             });
+            long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
 
             //Now construct the remapping info
             Set<String> brokersToMapOut = new HashSet<>();
@@ -230,8 +320,8 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                 String mapOutBroker = topicQueueMappingOne.getBname();
                 brokersToMapIn.add(mapInBroker);
                 brokersToMapOut.add(mapOutBroker);
-                TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(mapInBroker);
-                TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(mapOutBroker);
+                TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
+                TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
 
                 mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
                 mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
@@ -246,61 +336,24 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                 mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
             }
 
-            long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
-            existedTopicConfigMap.values().forEach( configMapping -> {
-                configMapping.getMappingDetail().setEpoch(epoch);
+            brokerConfigMap.values().forEach(configMapping -> {
+                configMapping.getMappingDetail().setEpoch(newEpoch);
                 configMapping.getMappingDetail().setTotalQueues(maxNum);
             });
-            TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
-            // now do the remapping
-            //Step1: let the new leader can be write without the logicOffset
-            for (String broker: brokersToMapIn) {
-                String addr = clientMetadata.findMasterBrokerAddr(broker);
-                TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
-                defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
-            }
-            //Step2: forbid the write of old leader
-            for (String broker: brokersToMapOut) {
-                String addr = clientMetadata.findMasterBrokerAddr(broker);
-                TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
-                defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
-            }
-            //Step3: decide the logic offset
-            for (String broker: brokersToMapOut) {
-                String addr = clientMetadata.findMasterBrokerAddr(broker);
-                TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
-                TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(broker);
-                for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
-                    ImmutableList<LogicQueueMappingItem> items = entry.getValue();
-                    Integer globalId = entry.getKey();
-                    if (items.size() < 2) {
-                        continue;
-                    }
-                    LogicQueueMappingItem newLeader = items.get(items.size() - 1);
-                    LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
-                    if (newLeader.getLogicOffset()  > 0) {
-                        continue;
-                    }
-                    TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
-                    if (topicOffset == null) {
-                        throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
-                    }
-                    //TODO check the max offset, will it return -1?
-                    if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
-                        throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
-                    }
-                    newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
-                    TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(newLeader.getBname());
-                    //fresh the new leader
-                    mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
-                }
-            }
-            //Step4: write to the new leader with logic offset
-            for (String broker: brokersToMapIn) {
-                String addr = clientMetadata.findMasterBrokerAddr(broker);
-                TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
-                defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+            //double check
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+            TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(brokerConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
+
+            {
+                TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, expectedIdToBroker, brokerConfigMap);
+                newWrapper.setBrokerToMapIn(brokersToMapIn);
+                newWrapper.setBrokerToMapOut(brokersToMapOut);
+                String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
+                System.out.println("The old mapping data is written to file " + newMappingDataFile);
             }
+
+            doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt);
+
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
         } finally {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 21ce1cf..cdfdcfe 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -39,7 +39,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 
-import java.nio.charset.Charset;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -84,7 +83,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
         options.addOption(opt);
 
         opt = new Option("f", "mapFile", true, "The map file name");
-        opt.setRequired(true);
+        opt.setRequired(false);
         options.addOption(opt);
 
         return options;
@@ -99,12 +98,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
         ClientMetadata clientMetadata = new ClientMetadata();
 
         try {
+            String topic = commandLine.getOptionValue('t').trim();
             String mapFileName = commandLine.getOptionValue('f').trim();
             String mapData = MixAll.file2String(mapFileName);
             TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
             //double check the config
-            TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(wrapper.getBrokerConfigMap());
-            TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
+            TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
 
             ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
             if (clusterInfo == null
@@ -131,6 +131,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
     @Override
     public void execute(final CommandLine commandLine, final Options options,
         RPCHook rpcHook) throws SubCommandException {
+        if (!commandLine.hasOption('t')) {
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+            return;
+        }
+
         if (commandLine.hasOption("f")) {
             executeFromFile(commandLine, options, rpcHook);
             return;
@@ -159,7 +164,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
 
-
             String clusters = commandLine.getOptionValue('c').trim();
             for (String cluster : clusters.split(",")) {
                 cluster = cluster.trim();
@@ -200,8 +204,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
             Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
             if (!brokerConfigMap.isEmpty()) {
-                maxEpochAndNum = TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
-                globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+                maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+                globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
             }
             if (queueNum < globalIdMap.size()) {
                 throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
@@ -213,7 +217,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
             {
                 TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap);
-                String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, "before");
+                String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
                 System.out.println("The old mapping data is written to file " + oldMappingDataFile);
             }
 
@@ -264,12 +268,12 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 configMapping.getMappingDetail().setTotalQueues(queueNum);
             });
             //double check the config
-            TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
-            TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+            TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
+            TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
 
             {
                 TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap);
-                String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, "after");
+                String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
                 System.out.println("The new mapping data is written to file " + newMappingDataFile);
             }