You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/18 08:57:45 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Enable to run from file for createStaticTopic command

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 165e133  Enable to run from file for createStaticTopic command
165e133 is described below

commit 165e133a64ae34fa992223f3472f6398d3c9cb2a
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 18 16:57:19 2021 +0800

    Enable to run from file for createStaticTopic command
---
 .../protocol/body/TopicConfigSerializeWrapper.java |   6 +-
 .../common/statictopic/TopicQueueMappingUtils.java |  77 ++++++++++
 .../statictopic/TopicRemappingDetailWrapper.java   |  47 +++++++
 .../{ => statictopic}/TopicQueueMappingTest.java   |  12 +-
 .../command/topic/UpdateStaticTopicSubCommand.java | 156 +++++++++++++--------
 5 files changed, 236 insertions(+), 62 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
index 1176e6b..a71c50a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -17,13 +17,13 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 public class TopicConfigSerializeWrapper extends RemotingSerializable {
     private ConcurrentMap<String, TopicConfig> topicConfigTable =
         new ConcurrentHashMap<String, TopicConfig>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index dfb6bbf..9784199 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -17,9 +17,12 @@
 package org.apache.rocketmq.common.statictopic;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.rocketmq.common.MixAll;
 
+import java.io.File;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -103,6 +106,65 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
     }
 
+    public static List<TopicQueueMappingDetail> getMappingDetailFromConfig(Collection<TopicConfigAndQueueMapping> configs) {
+        List<TopicQueueMappingDetail> detailList = new ArrayList<TopicQueueMappingDetail>();
+        for (TopicConfigAndQueueMapping configMapping : configs) {
+            if (configMapping.getMappingDetail() != null) {
+                detailList.add(configMapping.getMappingDetail());
+            }
+        }
+        return detailList;
+    }
+
+    public static Map.Entry<Long, Integer> validConsistenceOfTopicConfigAndQueueMapping(Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+        if (brokerConfigMap == null
+            || brokerConfigMap.isEmpty()) {
+            return null;
+        }
+        //make sure it it not null
+        String topic = null;
+        long maxEpoch = -1;
+        int maxNum = -1;
+        for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+            String broker = entry.getKey();
+            TopicConfigAndQueueMapping configMapping = entry.getValue();
+            if (configMapping.getMappingDetail() == null) {
+                throw new RuntimeException("Mapping info should not be null in broker " + broker);
+            }
+            TopicQueueMappingDetail mappingDetail = configMapping.getMappingDetail();
+            if (!broker.equals(mappingDetail.getBname())) {
+                throw new RuntimeException(String.format("The broker name is not equal %s != %s ", broker, mappingDetail.getBname()));
+            }
+            if (mappingDetail.isDirty()) {
+                throw new RuntimeException("The mapping info is dirty in broker  " + broker);
+            }
+            if (!configMapping.getTopicName().equals(mappingDetail.getTopic())) {
+                throw new RuntimeException("The topic name is inconsistent in broker  " + broker);
+            }
+            if (topic != null
+                && !topic.equals(mappingDetail.getTopic())) {
+                throw new RuntimeException("The topic name is inconsistent in broker  " + broker);
+            } else {
+                topic = mappingDetail.getTopic();
+            }
+
+            if (maxEpoch != -1
+                && maxEpoch != mappingDetail.getEpoch()) {
+                throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpoch, mappingDetail.getEpoch(), mappingDetail.getBname()));
+            } else {
+                maxEpoch = mappingDetail.getEpoch();
+            }
+
+            if (maxNum != -1
+                && maxNum != mappingDetail.getTotalQueues()) {
+                throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxNum, mappingDetail.getTotalQueues(), mappingDetail.getBname()));
+            } else {
+                maxNum = mappingDetail.getTotalQueues();
+            }
+        }
+        return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
+    }
+
     public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
         Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
             @Override
@@ -153,4 +215,19 @@ public class TopicQueueMappingUtils {
         assert items.size() > 0;
         return items.get(items.size() - 1);
     }
+
+    public static String writeToTemp(TopicRemappingDetailWrapper wrapper, String suffix) {
+        String topic = wrapper.getTopic();
+        String data = wrapper.toJson();
+        String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + "-" + suffix;
+        try {
+            MixAll.string2File(data, fileName);
+            return fileName;
+        } catch (Exception e) {
+            throw new RuntimeException("write file failed " + fileName,e);
+        }
+    }
+
+
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
new file mode 100644
index 0000000..d229203
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
@@ -0,0 +1,47 @@
+package org.apache.rocketmq.common.statictopic;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TopicRemappingDetailWrapper extends RemotingSerializable {
+    public static final String TYPE_CREATE_OR_UPDATE = "CREATE_OR_UPDATE";
+    public static final String TYPE_REMAPPING = "REMAPPING";
+
+
+    private final String topic;
+    private final String type;
+    private final long epoch;
+    private Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
+
+    private Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
+
+    public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<Integer, String> expectedIdToBroker, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+        this.topic = topic;
+        this.type = type;
+        this.epoch = epoch;
+        this.expectedIdToBroker = expectedIdToBroker;
+        this.brokerConfigMap = brokerConfigMap;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public long getEpoch() {
+        return epoch;
+    }
+
+    public Map<Integer, String> getExpectedIdToBroker() {
+        return expectedIdToBroker;
+    }
+
+    public Map<String, TopicConfigAndQueueMapping> getBrokerConfigMap() {
+        return brokerConfigMap;
+    }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
similarity index 88%
rename from common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java
rename to common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
index e6e3528..a1b3d27 100644
--- a/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.common;
+package org.apache.rocketmq.common.statictopic;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
@@ -9,11 +9,19 @@ import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.Map;
 
 public class TopicQueueMappingTest {
 
     @Test
+    public void testWriteToFile() {
+        System.out.println(System.getProperty("java.io.tmpdir"));
+        System.out.println(File.separator);
+    }
+
+
+    @Test
     public void testJsonSerialize() {
         LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
         String mappingItemJson = JSON.toJSONString(mappingItem) ;
@@ -30,7 +38,7 @@ public class TopicQueueMappingTest {
         Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
         Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
 
-        TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01");
+        TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
         mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
 
         String mappingDetailJson = JSON.toJSONString(mappingDetail);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 65c7377..21ce1cf 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
@@ -31,17 +32,18 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 
+import java.nio.charset.Charset;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -81,18 +83,64 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
         opt.setRequired(true);
         options.addOption(opt);
 
+        opt = new Option("f", "mapFile", true, "The map file name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
         return options;
     }
 
 
+
+    public void executeFromFile(final CommandLine commandLine, final Options options,
+                        RPCHook rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        ClientMetadata clientMetadata = new ClientMetadata();
+
+        try {
+            String mapFileName = commandLine.getOptionValue('f').trim();
+            String mapData = MixAll.file2String(mapFileName);
+            TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
+            //double check the config
+            TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(wrapper.getBrokerConfigMap());
+            TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
+
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            if (clusterInfo == null
+                    || clusterInfo.getClusterAddrTable().isEmpty()) {
+                throw new RuntimeException("The Cluster info is empty");
+            }
+            clientMetadata.refreshClusterInfo(clusterInfo);
+            //If some succeed, and others fail, it will cause inconsistent data
+            for (Map.Entry<String, TopicConfigAndQueueMapping> entry : wrapper.getBrokerConfigMap().entrySet()) {
+                String broker = entry.getKey();
+                String addr = clientMetadata.findMasterBrokerAddr(broker);
+                TopicConfigAndQueueMapping configMapping = entry.getValue();
+                defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
+            }
+            return;
+        }catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+
     @Override
     public void execute(final CommandLine commandLine, final Options options,
         RPCHook rpcHook) throws SubCommandException {
+        if (commandLine.hasOption("f")) {
+            executeFromFile(commandLine, options, rpcHook);
+            return;
+        }
+
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
         defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
-
         ClientMetadata clientMetadata = new ClientMetadata();
-        Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
+
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
         Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
         Set<String> brokers = new HashSet<>();
 
@@ -111,8 +159,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
 
-            String topic = commandLine.getOptionValue('t').trim();
-            int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
+
             String clusters = commandLine.getOptionValue('c').trim();
             for (String cluster : clusters.split(",")) {
                 cluster = cluster.trim();
@@ -131,51 +178,30 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             }
 
             //get the existed topic config and mapping
-            TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
-            clientMetadata.freshTopicRoute(topic, routeData);
-            if (routeData != null
-                    && !routeData.getQueueDatas().isEmpty()) {
-                for (QueueData queueData: routeData.getQueueDatas()) {
-                    String bname = queueData.getBrokerName();
-                    String addr = clientMetadata.findMasterBrokerAddr(bname);
-                    TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
-                    //allow the config is null
-                    if (mapping != null) {
-                        existedTopicConfigMap.put(bname, mapping);
+            String topic = commandLine.getOptionValue('t').trim();
+            int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
+            {
+                TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+                clientMetadata.freshTopicRoute(topic, routeData);
+
+                if (routeData != null
+                        && !routeData.getQueueDatas().isEmpty()) {
+                    for (QueueData queueData: routeData.getQueueDatas()) {
+                        String bname = queueData.getBrokerName();
+                        String addr = clientMetadata.findMasterBrokerAddr(bname);
+                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+                        //allow the config is null
+                        if (mapping != null) {
+                            brokerConfigMap.put(bname, mapping);
+                        }
                     }
                 }
             }
 
             Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
-            if (!existedTopicConfigMap.isEmpty()) {
-                //make sure it it not null
-                existedTopicConfigMap.forEach((key, value) -> {
-                    if (value.getMappingDetail() != null) {
-                        throw new RuntimeException("Mapping info should be null in broker " + key);
-                    }
-                });
-                //make sure the detail is not dirty
-                existedTopicConfigMap.forEach((key, value) -> {
-                    if (!key.equals(value.getMappingDetail().getBname())) {
-                        throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
-                    }
-                    if (value.getMappingDetail().isDirty()) {
-                        throw new RuntimeException("The mapping info is dirty in broker  " + value.getMappingDetail().getBname());
-                    }
-                });
-
-                List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
-                //check the epoch and qnum
-                maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
-                for (TopicQueueMappingDetail mappingDetail : detailList) {
-                    if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
-                        throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
-                    }
-                    if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
-                        throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
-                    }
-                }
-                globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true);
+            if (!brokerConfigMap.isEmpty()) {
+                maxEpochAndNum = TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
+                globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
             }
             if (queueNum < globalIdMap.size()) {
                 throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
@@ -184,24 +210,32 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             if (queueNum == globalIdMap.size()) {
                 throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
             }
+
+            {
+                TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap);
+                String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, "before");
+                System.out.println("The old mapping data is written to file " + oldMappingDataFile);
+            }
+
+
             //the check is ok, now do the mapping allocation
             Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
-            Map<Integer, String> idToBroker = new HashMap<>();
+            final Map<Integer, String> oldIdToBroker = new HashMap<>();
             globalIdMap.forEach((key, value) -> {
                 String leaderbroker = value.getBname();
-                idToBroker.put(key, leaderbroker);
+                oldIdToBroker.put(key, leaderbroker);
                 if (!brokerNumMap.containsKey(leaderbroker)) {
                     brokerNumMap.put(leaderbroker, 1);
                 } else {
                     brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
                 }
             });
-            TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap);
+            TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
             allocator.upToNum(queueNum);
             Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
 
             //construct the topic configAndMapping
-            long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
+            long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
             for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
                 Integer queueId = e.getKey();
                 String broker = e.getValue();
@@ -210,13 +244,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                     continue;
                 }
                 TopicConfigAndQueueMapping configMapping;
-                if (!existedTopicConfigMap.containsKey(broker)) {
+                if (!brokerConfigMap.containsKey(broker)) {
                     configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
                     configMapping.setWriteQueueNums(1);
                     configMapping.setReadQueueNums(1);
-                    existedTopicConfigMap.put(broker, configMapping);
+                    brokerConfigMap.put(broker, configMapping);
                 } else {
-                    configMapping = existedTopicConfigMap.get(broker);
+                    configMapping = brokerConfigMap.get(broker);
                     configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
                     configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
                 }
@@ -224,15 +258,23 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
             }
 
-            //double check the topic config map
-            existedTopicConfigMap.values().forEach( configMapping -> {
-                configMapping.getMappingDetail().setEpoch(epoch);
+            // set the topic config
+            brokerConfigMap.values().forEach(configMapping -> {
+                configMapping.getMappingDetail().setEpoch(newEpoch);
                 configMapping.getMappingDetail().setTotalQueues(queueNum);
             });
-            TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
+            //double check the config
+            TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap);
+            TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+
+            {
+                TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap);
+                String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, "after");
+                System.out.println("The new mapping data is written to file " + newMappingDataFile);
+            }
 
             //If some succeed, and others fail, it will cause inconsistent data
-            for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
+            for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
                 String broker = entry.getKey();
                 String addr = clientMetadata.findMasterBrokerAddr(broker);
                 TopicConfigAndQueueMapping configMapping = entry.getValue();