You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/02 12:01:37 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Refactor the admin code, reduce exposing apis

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 2dd6594  Refactor the admin code, reduce exposing apis
2dd6594 is described below

commit 2dd659494475d2212609fc24b872a10f422b9180
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 2 20:01:13 2021 +0800

    Refactor the admin code, reduce exposing apis
---
 .../common/statictopic/TopicQueueMappingUtils.java |   9 +-
 .../statictopic/TopicQueueMappingUtilsTest.java    |  10 +-
 .../util/{MQAdmin.java => MQAdminTestUtils.java}   |  28 ++-
 .../org/apache/rocketmq/test/base/BaseConf.java    |   6 +-
 .../rocketmq/test/base/IntegrationTestBase.java    |   4 +-
 .../apache/rocketmq/test/smoke/StaticTopicIT.java  |  87 ++++---
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   9 -
 .../tools/admin/DefaultMQAdminExtImpl.java         | 133 -----------
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   3 -
 .../apache/rocketmq/tools/admin/MQAdminUtils.java  | 254 +++++++++++++++++++++
 .../topic/RemappingStaticTopicSubCommand.java      |  24 +-
 .../command/topic/UpdateStaticTopicSubCommand.java |  43 +---
 12 files changed, 344 insertions(+), 266 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 975a5ba..e56d585 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -422,9 +422,8 @@ public class TopicQueueMappingUtils {
         }
     }
 
-    public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Set<String> nonTargetBrokers,  Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
+    public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set<String> targetBrokers, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
         checkTargetBrokersComplete(targetBrokers, brokerConfigMap);
-        checkNonTargetBrokers(targetBrokers, nonTargetBrokers);
         Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
         Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<Long, Integer>(System.currentTimeMillis(), queueNum);
         if (!brokerConfigMap.isEmpty()) {
@@ -484,12 +483,6 @@ public class TopicQueueMappingUtils {
             TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList<LogicQueueMappingItem>(Collections.singletonList(mappingItem)));
         }
 
-        //set the non target brokers
-        for (String broker : nonTargetBrokers) {
-            if (!brokerConfigMap.containsKey(broker)) {
-                brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch)));
-            }
-        }
         // set the topic config
         for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
             TopicConfigAndQueueMapping configMapping = entry.getValue();
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java
index 93cad48..f5cd0ef 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java
@@ -128,9 +128,9 @@ public class TopicQueueMappingUtilsTest {
             Set<String> targetBrokers = buildTargetBrokers(2 * i);
             Set<String> nonTargetBrokers = buildTargetBrokers(2 * i, "test");
             queueNum = 10 * i;
-            TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, nonTargetBrokers, brokerConfigMap);
+            TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
             Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
-            Assert.assertEquals(4 * i, brokerConfigMap.size());
+            Assert.assertEquals(2 * i, brokerConfigMap.size());
 
             //do the check manually
             Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap);
@@ -168,7 +168,7 @@ public class TopicQueueMappingUtilsTest {
         int queueNum = 7;
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
         Set<String>  originalBrokers = buildTargetBrokers(2);
-        TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap);
+        TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap);
         Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
         Assert.assertEquals(2, brokerConfigMap.size());
 
@@ -213,7 +213,7 @@ public class TopicQueueMappingUtilsTest {
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
         Set<String>  originalBrokers = buildTargetBrokers(2);
         {
-            TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, new HashSet<String>(), brokerConfigMap);
+            TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers,  brokerConfigMap);
             Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
             Assert.assertEquals(2, brokerConfigMap.size());
         }
@@ -234,7 +234,7 @@ public class TopicQueueMappingUtilsTest {
         int queueNum = 10;
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
         Set<String> targetBrokers = buildTargetBrokers(2);
-        TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<String>(), brokerConfigMap);
+        TopicRemappingDetailWrapper wrapper  = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
         Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap);
         Assert.assertEquals(2, brokerConfigMap.size());
         TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
similarity index 76%
rename from test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
rename to test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 8c0caa6..9de6205 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.test.util;
 
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ForkJoinPool;
@@ -25,12 +26,16 @@ import org.apache.log4j.Logger;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminUtils;
 import org.apache.rocketmq.tools.command.CommandUtil;
 
-public class MQAdmin {
-    private static Logger log = Logger.getLogger(MQAdmin.class);
+public class MQAdminTestUtils {
+    private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
 
     public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
         int queueNum) {
@@ -163,4 +168,23 @@ public class MQAdmin {
         ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
     }
 
+    //should only be test, if some middle operation failed, it dose not backup the brokerConfigMap
+    public static Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+        assert  brokerConfigMap.isEmpty();
+        TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
+        MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
+        MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false);
+        return brokerConfigMap;
+    }
+
+    //should only be test, if some middle operation failed, it dose not backup the brokerConfigMap
+    public static void remappingStaticTopic(String topic, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+        assert !brokerConfigMap.isEmpty();
+        TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
+        MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
+        MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false, defaultMQAdminExt);
+    }
+
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 53a7ab3..ddeee6e 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -31,8 +31,6 @@ import org.apache.log4j.Logger;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.client.consumer.MQPullConsumer;
 import org.apache.rocketmq.client.consumer.MQPushConsumer;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MQProducer;
 import org.apache.rocketmq.client.producer.TransactionListener;
 import org.apache.rocketmq.common.MQVersion;
@@ -47,7 +45,7 @@ import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
 import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
 import org.apache.rocketmq.test.factory.ConsumerFactory;
 import org.apache.rocketmq.test.listener.AbstractListener;
-import org.apache.rocketmq.test.util.MQAdmin;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
 import org.apache.rocketmq.test.util.MQRandomUtils;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.admin.MQAdminExt;
@@ -122,7 +120,7 @@ public class BaseConf {
     }
 
     public static String initConsumerGroup(String group) {
-        MQAdmin.createSub(nsAddr, clusterName, group);
+        MQAdminTestUtils.createSub(nsAddr, clusterName, group);
         return group;
     }
 
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 50dc8fc..06f079c 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -34,7 +34,7 @@ import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.test.util.MQAdmin;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
 import org.apache.rocketmq.test.util.TestUtils;
 
 public class IntegrationTestBase {
@@ -166,7 +166,7 @@ public class IntegrationTestBase {
         boolean createResult;
 
         while (true) {
-            createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, queueNumbers);
+            createResult = MQAdminTestUtils.createTopic(nsAddr, clusterName, topic, queueNumbers);
             if (createResult) {
                 break;
             } else if (System.currentTimeMillis() - startTime > topicCreateTime) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index c1cc60b..aeddef6 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -1,10 +1,8 @@
 package org.apache.rocketmq.test.smoke;
 
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageClientExt;
-import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
@@ -12,20 +10,20 @@ import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
-import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.test.base.BaseConf;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
 import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
 import org.apache.rocketmq.test.util.MQRandomUtils;
 import org.apache.rocketmq.test.util.VerifyUtils;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
-import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,50 +43,44 @@ public class StaticTopicIT extends BaseConf {
 
     private static Logger logger = Logger.getLogger(StaticTopicIT.class);
     private DefaultMQAdminExt defaultMQAdminExt;
-    private ClientMetadata clientMetadata;
 
     @Before
     public void setUp() throws Exception {
         System.setProperty("rocketmq.client.rebalance.waitInterval", "500");
         defaultMQAdminExt = getAdmin(nsAddr);
         waitBrokerRegistered(nsAddr, clusterName);
-        clientMetadata = new ClientMetadata();
         defaultMQAdminExt.start();
-        ClusterInfo clusterInfo  = defaultMQAdminExt.examineBrokerClusterInfo();
-        if (clusterInfo == null
-                || clusterInfo.getClusterAddrTable().isEmpty()) {
-            throw new RuntimeException("The Cluster info is empty");
-        }
-        clientMetadata.refreshClusterInfo(clusterInfo);
     }
 
-    public Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers) throws Exception {
-        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
-        Assert.assertTrue(brokerConfigMap.isEmpty());
-        TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
-        Assert.assertEquals(targetBrokers.size(), brokerConfigMap.size());
-        //If some succeed, and others fail, it will cause inconsistent data
-        for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
-            String broker = entry.getKey();
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicConfigAndQueueMapping configMapping = entry.getValue();
-            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
+    @Test
+    public void testNoTargetBrokers() throws Exception {
+        String topic = "static" + MQRandomUtils.getRandomTopic();
+        int queueNum = 10;
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker1Name);
+            MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+            Assert.assertEquals(2, remoteBrokerConfigMap.size());
+            TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
+            Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
+            Assert.assertEquals(queueNum, globalIdMap.size());
+            TopicConfigAndQueueMapping configMapping = remoteBrokerConfigMap.get(broker2Name);
+            Assert.assertEquals(0, configMapping.getWriteQueueNums());
+            Assert.assertEquals(0, configMapping.getReadQueueNums());
+            Assert.assertEquals(0, configMapping.getMappingDetail().getHostedQueues().size());
         }
-        return brokerConfigMap;
-    }
-
-    public void remappingStaticTopic(String topic, Set<String> targetBrokers) throws Exception {
-        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
-        Assert.assertFalse(brokerConfigMap.isEmpty());
-        TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
-        defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false);
-    }
-
-
-
 
-    @Test
-    public void testNonTargetBrokers() {
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker2Name);
+            MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+            Assert.assertEquals(2, remoteBrokerConfigMap.size());
+            TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
+            Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
+            Assert.assertEquals(queueNum, globalIdMap.size());
+        }
 
     }
 
@@ -102,10 +94,10 @@ public class StaticTopicIT extends BaseConf {
         int queueNum = 10;
         int msgEachQueue = 100;
         //create static topic
-        Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = createStaticTopic(topic, queueNum, getBrokers());
+        Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
         //check the static topic config
         {
-            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
             Assert.assertEquals(2, remoteBrokerConfigMap.size());
             for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet())  {
                 String broker = entry.getKey();
@@ -178,7 +170,7 @@ public class StaticTopicIT extends BaseConf {
     }
 
     @Test
-    public void testDoubleReadCheck() throws Exception {
+    public void testDoubleReadCheckConsumerOffset() throws Exception {
         String topic = "static" + MQRandomUtils.getRandomTopic();
         String group = initConsumerGroup();
         RMQNormalProducer producer = getProducer(nsAddr, topic);
@@ -194,7 +186,7 @@ public class StaticTopicIT extends BaseConf {
         {
             Set<String> targetBrokers = new HashSet<>();
             targetBrokers.add(broker1Name);
-            createStaticTopic(topic, queueNum, targetBrokers);
+            MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
         }
         //produce the messages
         {
@@ -217,7 +209,7 @@ public class StaticTopicIT extends BaseConf {
         {
             Set<String> targetBrokers = new HashSet<>();
             targetBrokers.add(broker2Name);
-            remappingStaticTopic(topic, targetBrokers);
+            MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
 
         }
         //make the metadata
@@ -226,8 +218,8 @@ public class StaticTopicIT extends BaseConf {
 
         {
             producer = getProducer(nsAddr, topic);
+            ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
             //just refresh the metadata
-            defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
             List<MessageQueue> messageQueueList = producer.getMessageQueue();
             for(MessageQueue messageQueue: messageQueueList) {
                 producer.send(msgEachQueue, messageQueue);
@@ -276,7 +268,7 @@ public class StaticTopicIT extends BaseConf {
         {
             Set<String> targetBrokers = new HashSet<>();
             targetBrokers.add(broker1Name);
-            createStaticTopic(topic, queueNum, targetBrokers);
+            MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
         }
         //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
         //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
@@ -310,8 +302,8 @@ public class StaticTopicIT extends BaseConf {
         {
             Set<String> targetBrokers = new HashSet<>();
             targetBrokers.add(broker2Name);
-            remappingStaticTopic(topic, targetBrokers);
-            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+            MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
 
             TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
             Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
@@ -324,6 +316,7 @@ public class StaticTopicIT extends BaseConf {
         Thread.sleep(500);
         producer.setDebug(true);
         {
+            ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
             List<MessageQueue> messageQueueList = producer.getMessageQueue();
             for (int i = 0; i < queueNum; i++) {
                 MessageQueue messageQueue = messageQueueList.get(i);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index a36948c..5c80e86 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -220,11 +220,6 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
-    public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException,  InterruptedException, MQBrokerException {
-        return this.defaultMQAdminExtImpl.examineTopicConfigAll(clientMetadata, topic);
-    }
-
-    @Override
     public TopicStatsTable examineTopicStats(
         String topic) throws RemotingException, MQClientException, InterruptedException,
         MQBrokerException {
@@ -621,9 +616,5 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
         this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
     }
 
-    @Override
-    public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        this.defaultMQAdminExtImpl.remappingStaticTopic(clientMetadata, topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, blockSeqSize, force);
-    }
 
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 07c4bf3..7b0450c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1056,139 +1056,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 
 
     @Override
-    public void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        for (String broker : brokerConfigMap.keySet()) {
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            if (addr == null) {
-                throw new RuntimeException("Can't find addr for broker " + broker);
-            }
-        }
-        // now do the remapping
-        //Step1: let the new leader can be write without the logicOffset
-        for (String broker: brokersToMapIn) {
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
-            createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
-        }
-        //Step2: forbid the write of old leader
-        for (String broker: brokersToMapOut) {
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
-            createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
-        }
-        //Step3: decide the logic offset
-        for (String broker: brokersToMapOut) {
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicStatsTable statsTable = examineTopicStats(addr, topic);
-            TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
-            for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
-                List<LogicQueueMappingItem> items = entry.getValue();
-                Integer globalId = entry.getKey();
-                if (items.size() < 2) {
-                    continue;
-                }
-                LogicQueueMappingItem newLeader = items.get(items.size() - 1);
-                LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
-                if (newLeader.getLogicOffset()  > 0) {
-                    continue;
-                }
-                TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
-                if (topicOffset == null) {
-                    throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
-                }
-                //TODO check the max offset, will it return -1?
-                if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
-                    throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
-                }
-                newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
-                TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
-                //fresh the new leader
-                TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);
-            }
-        }
-        //Step4: write to the new leader with logic offset
-        for (String broker: brokersToMapIn) {
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
-            createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
-        }
-        //Step5: write the non-target brokers
-        for (String broker: brokerConfigMap.keySet()) {
-            if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
-                continue;
-            }
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
-            createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
-        }
-    }
-
-    @Override
-    public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException,  InterruptedException, MQBrokerException {
-        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
-        boolean getFromBrokers = false;
-        TopicRouteData routeData = null;
-        try {
-            routeData = examineTopicRouteInfo(topic);
-        } catch (MQClientException  exception) {
-            if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
-                throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
-            } else {
-                getFromBrokers = true;
-            }
-        }
-        if (!getFromBrokers) {
-            if (routeData != null
-                    && !routeData.getQueueDatas().isEmpty()) {
-                clientMetadata.freshTopicRoute(topic, routeData);
-                for (QueueData queueData: routeData.getQueueDatas()) {
-                    String bname = queueData.getBrokerName();
-                    String addr = clientMetadata.findMasterBrokerAddr(bname);
-                    try {
-                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
-                        //allow the config is null
-                        if (mapping != null) {
-                            brokerConfigMap.put(bname, mapping);
-                        }
-                    } catch (MQBrokerException exception) {
-                        if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
-                            throw exception;
-                        }
-                    }
-
-                }
-            }
-        } else {
-            log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic);
-            //if cannot get from nameserver, then check all the brokers
-            ClusterInfo clusterInfo = examineBrokerClusterInfo();
-            if (clusterInfo != null
-                    && clusterInfo.getBrokerAddrTable() != null) {
-                clientMetadata.refreshClusterInfo(clusterInfo);
-            }
-            for (Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
-                String bname = entry.getKey();
-                HashMap<Long, String> map = entry.getValue();
-                String addr = map.get(MixAll.MASTER_ID);
-                if (addr != null) {
-                    try {
-                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) examineTopicConfig(addr, topic);
-                        //allow the config is null
-                        if (mapping != null) {
-                            brokerConfigMap.put(bname, mapping);
-                        }
-                    }  catch (MQBrokerException exception1) {
-                        if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
-                            throw exception1;
-                        }
-                    }
-                }
-            }
-        }
-        return brokerConfigMap;
-    }
-
-    @Override
     public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
     }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 6c27443..c4838e3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -320,7 +320,4 @@ public interface MQAdminExt extends MQAdmin {
 
     void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException;
 
-    Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException;
-
-    void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
new file mode 100644
index 0000000..288bac4
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -0,0 +1,254 @@
+package org.apache.rocketmq.tools.admin;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MQAdminUtils {
+
+
+    public static ClientMetadata getBrokerMetadata(DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+        ClientMetadata clientMetadata  = new ClientMetadata();
+        refreshClusterInfo(defaultMQAdminExt, clientMetadata);
+        return clientMetadata;
+    }
+
+    public static ClientMetadata getBrokerAndTopicMetadata(String topic, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingException, MQBrokerException {
+        ClientMetadata clientMetadata  = new ClientMetadata();
+        refreshClusterInfo(defaultMQAdminExt, clientMetadata);
+        refreshTopicRouteInfo(topic, defaultMQAdminExt, clientMetadata);
+        return clientMetadata;
+    }
+
+    public static void refreshClusterInfo(DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        ClusterInfo clusterInfo  = defaultMQAdminExt.examineBrokerClusterInfo();
+        if (clusterInfo == null
+                || clusterInfo.getClusterAddrTable().isEmpty()) {
+            throw new RuntimeException("The Cluster info is empty");
+        }
+        clientMetadata.refreshClusterInfo(clusterInfo);
+    }
+
+    public static void refreshTopicRouteInfo(String topic, DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) throws RemotingException, InterruptedException, MQBrokerException {
+        TopicRouteData routeData = null;
+        try {
+            routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+        } catch (MQClientException exception) {
+            if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+                throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
+            }
+        }
+        if (routeData != null
+                && !routeData.getQueueDatas().isEmpty()) {
+            clientMetadata.freshTopicRoute(topic, routeData);
+        }
+    }
+
+    public static Set<String> getAllBrokersInSameCluster(Collection<String> brokers, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        ClusterInfo clusterInfo  = defaultMQAdminExt.examineBrokerClusterInfo();
+        if (clusterInfo == null
+                || clusterInfo.getClusterAddrTable().isEmpty()) {
+            throw new RuntimeException("The Cluster info is empty");
+        }
+        Set<String> allBrokers = new HashSet<>();
+        for (String broker: brokers) {
+            if (allBrokers.contains(broker)) {
+                continue;
+            }
+            for (Set<String> clusterBrokers : clusterInfo.getClusterAddrTable().values()) {
+                if (clusterBrokers.contains(broker)) {
+                    allBrokers.addAll(clusterBrokers);
+                    break;
+                }
+            }
+        }
+        return allBrokers;
+    }
+
+    public static  void completeNoTargetBrokers(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, DefaultMQAdminExt defaultMQAdminExt) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+        TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next();
+        String topic = configMapping.getTopicName();
+        int queueNum = configMapping.getMappingDetail().getTotalQueues();
+        long newEpoch = configMapping.getMappingDetail().getEpoch();
+        Set<String> allBrokers = getAllBrokersInSameCluster(brokerConfigMap.keySet(), defaultMQAdminExt);
+        for (String broker: allBrokers) {
+            if (!brokerConfigMap.containsKey(broker)) {
+                brokerConfigMap.put(broker, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, queueNum, broker, newEpoch)));
+            }
+        }
+    }
+
+    public static void checkIfMasterAlive(Collection<String> brokers, DefaultMQAdminExt defaultMQAdminExt, ClientMetadata clientMetadata) {
+        for (String broker : brokers) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            if (addr == null) {
+                throw new RuntimeException("Can't find addr for broker " + broker);
+            }
+        }
+    }
+
+    public static void updateTopicConfigMappingAll(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
+        ClientMetadata clientMetadata = getBrokerMetadata(defaultMQAdminExt);
+        checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
+        //If some succeed, and others fail, it will cause inconsistent data
+        for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
+            String broker = entry.getKey();
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = entry.getValue();
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+    }
+
+    public static void remappingStaticTopic(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
+        ClientMetadata clientMetadata = MQAdminUtils.getBrokerMetadata(defaultMQAdminExt);
+        MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
+        // now do the remapping
+        //Step1: let the new leader can be write without the logicOffset
+        for (String broker: brokersToMapIn) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+        //Step2: forbid the write of old leader
+        for (String broker: brokersToMapOut) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+        //Step3: decide the logic offset
+        for (String broker: brokersToMapOut) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
+            TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
+            for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
+                List<LogicQueueMappingItem> items = entry.getValue();
+                Integer globalId = entry.getKey();
+                if (items.size() < 2) {
+                    continue;
+                }
+                LogicQueueMappingItem newLeader = items.get(items.size() - 1);
+                LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
+                if (newLeader.getLogicOffset()  > 0) {
+                    continue;
+                }
+                TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
+                if (topicOffset == null) {
+                    throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
+                }
+                //TODO check the max offset, will it return -1?
+                if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
+                    throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
+                }
+                newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
+                TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
+                //fresh the new leader
+                TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);
+            }
+        }
+        //Step4: write to the new leader with logic offset
+        for (String broker: brokersToMapIn) {
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+        //Step5: write the non-target brokers
+        for (String broker: brokerConfigMap.keySet()) {
+            if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
+                continue;
+            }
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
+    }
+
+    public static Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(String topic, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException,  InterruptedException, MQBrokerException {
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
+        ClientMetadata clientMetadata = new ClientMetadata();
+        boolean getFromBrokers = false;
+        TopicRouteData routeData = null;
+        try {
+            routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+        } catch (MQClientException  exception) {
+            if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+                throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
+            } else {
+                getFromBrokers = true;
+            }
+        }
+        if (!getFromBrokers) {
+            if (routeData != null
+                    && !routeData.getQueueDatas().isEmpty()) {
+                clientMetadata.freshTopicRoute(topic, routeData);
+                for (QueueData queueData: routeData.getQueueDatas()) {
+                    String bname = queueData.getBrokerName();
+                    String addr = clientMetadata.findMasterBrokerAddr(bname);
+                    try {
+                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+                        //allow the config is null
+                        if (mapping != null) {
+                            brokerConfigMap.put(bname, mapping);
+                        }
+                    } catch (MQBrokerException exception) {
+                        if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+                            throw exception;
+                        }
+                    }
+
+                }
+            }
+        } else {
+            //if cannot get from nameserver, then check all the brokers
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            if (clusterInfo != null
+                    && clusterInfo.getBrokerAddrTable() != null) {
+                clientMetadata.refreshClusterInfo(clusterInfo);
+            }
+            for (Map.Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
+                String bname = entry.getKey();
+                HashMap<Long, String> map = entry.getValue();
+                String addr = map.get(MixAll.MASTER_ID);
+                if (addr != null) {
+                    try {
+                        TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+                        //allow the config is null
+                        if (mapping != null) {
+                            brokerConfigMap.put(bname, mapping);
+                        }
+                    }  catch (MQBrokerException exception1) {
+                        if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
+                            throw exception1;
+                        }
+                    }
+                }
+            }
+        }
+        return brokerConfigMap;
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index b516f1c..9a6f15a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -20,6 +20,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
@@ -30,6 +31,7 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminUtils;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 
@@ -87,7 +89,6 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
 
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
         defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
-        ClientMetadata clientMetadata = new ClientMetadata();
 
         try {
             String topic = commandLine.getOptionValue('t').trim();
@@ -99,24 +100,12 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
             TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, wrapper.getBrokerConfigMap());
             TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
 
-            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
-            if (clusterInfo == null
-                    || clusterInfo.getClusterAddrTable().isEmpty()) {
-                throw new RuntimeException("The Cluster info is empty");
-            }
-            clientMetadata.refreshClusterInfo(clusterInfo);
 
             boolean force = false;
             if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
                 force = true;
             }
-            for (String broker : wrapper.getBrokerConfigMap().keySet()) {
-                String addr = clientMetadata.findMasterBrokerAddr(broker);
-                if (addr == null) {
-                    throw new RuntimeException("Can't find addr for broker " + broker);
-                }
-            }
-            defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force);
+            MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force, defaultMQAdminExt);
             return;
         }catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
@@ -159,7 +148,6 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                     || clusterInfo.getClusterAddrTable().isEmpty()) {
                 throw new RuntimeException("The Cluster info is empty");
             }
-            clientMetadata.refreshClusterInfo(clusterInfo);
             {
                 if (commandLine.hasOption("b")) {
                     String brokerStrs = commandLine.getOptionValue("b").trim();
@@ -186,7 +174,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                 }
             }
 
-            brokerConfigMap  = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+            brokerConfigMap  = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
             if (brokerConfigMap.isEmpty()) {
                 throw new RuntimeException("No topic route to do the remapping");
             }
@@ -204,7 +192,9 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
                 System.out.println("The old mapping data is written to file " + newMappingDataFile);
             }
 
-            defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false);
+            MQAdminUtils.completeNoTargetBrokers(newWrapper.getBrokerConfigMap(), defaultMQAdminExt);
+
+            MQAdminUtils.remappingStaticTopic(topic, newWrapper.getBrokerToMapIn(), newWrapper.getBrokerToMapOut(), newWrapper.getBrokerConfigMap(), 10000, false, defaultMQAdminExt);
 
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index a32dc8d..958922e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminUtils;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 
@@ -106,14 +107,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             }
             TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
 
-            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
-            if (clusterInfo == null
-                    || clusterInfo.getClusterAddrTable().isEmpty()) {
-                throw new RuntimeException("The Cluster info is empty");
-            }
-            clientMetadata.refreshClusterInfo(clusterInfo);
-
-            doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, force);
+            MQAdminUtils.completeNoTargetBrokers(wrapper.getBrokerConfigMap(), defaultMQAdminExt);
+            MQAdminUtils.updateTopicConfigMappingAll(wrapper.getBrokerConfigMap(), defaultMQAdminExt, false);
             return;
         }catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
@@ -122,22 +117,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
         }
     }
 
-    public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt, boolean force) throws Exception {
-        //check it before
-        for (String broker : brokerConfigMap.keySet()) {
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            if (addr == null) {
-                throw new RuntimeException("Can't find addr for broker " + broker);
-            }
-        }
-        //If some succeed, and others fail, it will cause inconsistent data
-        for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
-            String broker = entry.getKey();
-            String addr = clientMetadata.findMasterBrokerAddr(broker);
-            TopicConfigAndQueueMapping configMapping = entry.getValue();
-            defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
-        }
-    }
 
 
     @Override
@@ -155,7 +134,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
         defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
-        ClientMetadata clientMetadata = new ClientMetadata();
 
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
         Set<String> targetBrokers = new HashSet<>();
@@ -173,7 +151,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                     || clusterInfo.getClusterAddrTable().isEmpty()) {
                 throw new RuntimeException("The Cluster info is empty");
             }
-            clientMetadata.refreshClusterInfo(clusterInfo);
             {
                 if (commandLine.hasOption("b")) {
                     String brokerStrs = commandLine.getOptionValue("b").trim();
@@ -192,17 +169,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
                 if (targetBrokers.isEmpty()) {
                     throw new RuntimeException("Find none brokers, do nothing");
                 }
-                for (String broker : targetBrokers) {
-                    String addr = clientMetadata.findMasterBrokerAddr(broker);
-                    if (addr == null) {
-                        throw new RuntimeException("Can't find addr for broker " + broker);
-                    }
-                }
             }
 
             //get the existed topic config and mapping
 
-            brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+            brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
             int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
 
             Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
@@ -219,15 +190,15 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             targetBrokers.addAll(brokerConfigMap.keySet());
 
             //calculate the new data
-            TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
+            TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
 
             {
                 String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
                 System.out.println("The new mapping data is written to file " + newMappingDataFile);
             }
 
-            doUpdate(newWrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt, false);
-
+            MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
+            MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false);
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
         } finally {