You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/30 13:16:40 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Add the test for remapping static topic

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new f7f32e7  Add the test for remapping static topic
f7f32e7 is described below

commit f7f32e70633f856d6eb9dddea148e624fca971dc
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 30 21:15:27 2021 +0800

    Add the test for remapping static topic
---
 .../apache/rocketmq/broker/BrokerController.java   |  13 +--
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |   7 ++
 .../broker/processor/AdminBrokerProcessor.java     |  34 ++++--
 .../broker/processor/PullMessageProcessor.java     |  90 +++++++++++----
 .../broker/topic/TopicQueueMappingManager.java     |  13 ++-
 .../client/impl/factory/MQClientInstance.java      |   2 +
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |  45 +++++---
 .../apache/rocketmq/common/rpc/RpcClientUtils.java |   3 +
 .../statictopic/TopicQueueMappingContext.java      |  34 ++++++
 .../statictopic/TopicQueueMappingDetail.java       |   3 +-
 .../common/statictopic/TopicQueueMappingUtils.java |  14 ++-
 .../rocketmq/common/sysflag/PullSysFlag.java       |   4 +
 .../common/statictopic/TopicQueueMappingTest.java  |   7 ++
 ...lsTest.java => TopicQueueMappingUtilsTest.java} |   2 +-
 .../remoting/protocol/RemotingCommand.java         |  15 ++-
 .../remoting/protocol/RemotingCommandTest.java     |  97 +++++++++++++++-
 .../test/client/rmq/RMQNormalConsumer.java         |   5 +
 .../test/client/rmq/RMQNormalProducer.java         |   1 +
 .../org/apache/rocketmq/test/base/BaseConf.java    |   6 +-
 .../apache/rocketmq/test/smoke/StaticTopicIT.java  | 127 ++++++++++++++++++++-
 .../tools/admin/DefaultMQAdminExtImpl.java         |   9 ++
 21 files changed, 462 insertions(+), 69 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 646c530..9230d95 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -492,11 +492,11 @@ public class BrokerController {
 
             this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                 try {
-                    BrokerController.this.refreshBrokerNameMapping();
+                    BrokerController.this.brokerOuterAPI.refreshMetadata();
                 } catch (Exception e) {
-                    log.error("ScheduledTask examineBrokerClusterInfo exception", e);
+                    log.error("ScheduledTask refresh metadata exception", e);
                 }
-            }, 10, 10, TimeUnit.SECONDS);
+            }, 1, 5, TimeUnit.SECONDS);
 
             if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
@@ -624,13 +624,6 @@ public class BrokerController {
         }
     }
 
-    private void refreshBrokerNameMapping() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
-        ClusterInfo brokerClusterInfo = this.brokerOuterAPI.getBrokerClusterInfo();
-        brokerClusterInfo.getBrokerAddrTable().forEach((brokerName, data) -> {
-            String masterBrokerAddr = data.getBrokerAddrs().get(MixAll.MASTER_ID);
-            this.brokerName2AddrMap.put(brokerName, masterBrokerAddr);
-        });
-    }
 
     public String getBrokerAddrByName(String brokerName) {
         return this.brokerName2AddrMap.get(brokerName);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 384eef0..e9ef46e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.fastjson.JSON;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -477,6 +478,12 @@ public class BrokerOuterAPI {
     }
 
 
+    public void refreshMetadata() throws Exception {
+        ClusterInfo brokerClusterInfo = getBrokerClusterInfo();
+        clientMetadata.refreshClusterInfo(brokerClusterInfo);
+    }
+
+
     public ClientMetadata getClientMetadata() {
         return clientMetadata;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a4e3fd4..d9684e7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -621,13 +621,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             if (mappingContext.getMappingDetail() == null) {
                 return null;
             }
+            if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
+                return null;
+            }
+
             TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
-            LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
-            if (mappingItem == null
-                    || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+            List<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
+            if (mappingItems == null
+                    || mappingItems.isEmpty()) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
             }
-            List<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
             //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
             Long timestamp = requestHeader.getTimestamp();
             long offset = -1;
@@ -699,6 +702,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         if (mappingContext.getMappingDetail() == null) {
             return null;
         }
+        if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
+            return null;
+        }
+
         TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
         LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
         if (mappingItem == null
@@ -743,10 +750,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         if (mappingContext.getMappingDetail() == null) {
             return null;
         }
+        if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
+            return null;
+        }
+
         TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
         LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
-        if (mappingItem == null
-                || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+        if (mappingItem == null) {
+            //this may not
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
         };
         try {
@@ -774,7 +785,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             response.setCode(ResponseCode.SUCCESS);
             response.setRemark(null);
             return response;
-        }catch (Throwable t) {
+        } catch (Throwable t) {
+            t.printStackTrace();
+            log.error("rewriteRequestForStaticTopic failed", t);
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
     }
@@ -786,6 +799,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         final GetMinOffsetRequestHeader requestHeader =
             (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
 
+
         TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L);
         RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
@@ -804,10 +818,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         if (mappingContext.getMappingDetail() == null) {
             return null;
         }
+        if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
+            return null;
+        }
         TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
         LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
-        if (mappingItem == null
-                || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+        if (mappingItem == null) {
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
         };
         try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 6388c06..b98295c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -68,6 +68,7 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.PutMessageResult;
@@ -78,6 +79,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
+import static org.apache.rocketmq.remoting.protocol.RemotingCommand.decode;
 
 public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -134,6 +136,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                     && requestHeader.getMaxMsgNums() != null) {
                 requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
             }
+            int sysFlag = requestHeader.getSysFlag();
+            if (!mappingContext.isLeader()) {
+                sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
+                requestHeader.setSysFlag(sysFlag);
+            }
 
             if (mappingDetail.getBname().equals(bname)) {
                 //just let it go, do the local pull process
@@ -142,6 +149,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
 
             requestHeader.setPhysical(true);
             requestHeader.setBname(bname);
+            sysFlag = PullSysFlag.clearSuspendFlag(sysFlag);
+            sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
+            requestHeader.setSysFlag(sysFlag);
             RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null);
             RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
             if (rpcResponse.getException() != null) {
@@ -150,7 +160,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
 
             PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) rpcResponse.getHeader();
             {
-                RemotingCommand rewriteResult =  rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
+                RemotingCommand rewriteResult =  rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, rpcResponse.getCode());
                 if (rewriteResult != null) {
                     return rewriteResult;
                 }
@@ -161,35 +171,73 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
         }
     }
 
-    private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) {
+    private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader,
+                                                          TopicQueueMappingContext mappingContext, final int code) {
         try {
             if (mappingContext == null) {
                 return null;
             }
             TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
             LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
-            //handle nextBeginOffset
-            {
-                long nextBeginOffset = responseHeader.getNextBeginOffset();
-                assert nextBeginOffset >= requestHeader.getQueueOffset();
-                //the next begin offset should no more than the end offset
-                if (mappingItem.checkIfEndOffsetDecided()
-                        && nextBeginOffset >= mappingItem.getEndOffset()) {
-                    nextBeginOffset = mappingItem.getEndOffset();
+
+            long requestOffset = requestHeader.getQueueOffset();
+            long nextBeginOffset = responseHeader.getNextBeginOffset();
+            long minOffset = responseHeader.getMinOffset();
+            long maxOffset = responseHeader.getMaxOffset();
+            int responseCode = code;
+            if (responseCode != ResponseCode.SUCCESS
+                    && responseCode != ResponseCode.PULL_RETRY_IMMEDIATELY) {
+                if (mappingContext.isLeader()) {
+                    if (requestOffset < minOffset) {
+                        nextBeginOffset = minOffset;
+                        responseCode = ResponseCode.PULL_NOT_FOUND;
+                    } else if (requestOffset > maxOffset) {
+                        responseCode = ResponseCode.PULL_OFFSET_MOVED;
+                    } else if (requestOffset == maxOffset) {
+                        responseCode = ResponseCode.PULL_NOT_FOUND;
+                    } else {
+                        //let it go
+                    }
+                } else {
+                    if (requestOffset < minOffset) {
+                        nextBeginOffset = minOffset;
+                        responseCode = ResponseCode.PULL_NOT_FOUND;
+                    } else if (requestOffset >= maxOffset) {
+                        responseCode = ResponseCode.PULL_NOT_FOUND;
+                        //just move to another item
+                        mappingItem = mappingContext.findNext();
+                        assert  mappingItem != null;
+                        nextBeginOffset = mappingItem.getStartOffset();
+                        minOffset = mappingItem.getStartOffset();
+                        maxOffset = minOffset;
+                    }
                 }
-                responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
             }
+
+            //handle nextBeginOffset
+            //the next begin offset should no more than the end offset
+            if (mappingItem.checkIfEndOffsetDecided()
+                    && nextBeginOffset >= mappingItem.getEndOffset()) {
+                nextBeginOffset = mappingItem.getEndOffset();
+            }
+            responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
             //handle min offset
-            responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
+            responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), minOffset)));
             //handle max offset
-            responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()),
+            responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(maxOffset),
                     TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
             //set the offsetDelta
             responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta());
+
+            if (code != ResponseCode.SUCCESS
+                && code != ResponseCode.PULL_RETRY_IMMEDIATELY) {
+                return RemotingCommand.createResponseCommandWithHeader(responseCode, responseHeader);
+            } else {
+                return null;
+            }
         } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
-        return null;
     }
 
 
@@ -440,13 +488,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                     break;
             }
 
-            //rewrite the response for the
-            {
-                RemotingCommand rewriteResult =  rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
-                if (rewriteResult != null) {
-                    return rewriteResult;
-                }
-            }
+
 
 
             if (this.hasConsumeMessageHook()) {
@@ -491,6 +533,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                 this.executeConsumeMessageHookBefore(context);
             }
 
+            //rewrite the response for the
+            RemotingCommand rewriteResult =  rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
+            if (rewriteResult != null) {
+                response = rewriteResult;
+            }
+
             switch (response.getCode()) {
                 case ResponseCode.SUCCESS:
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 4b9d328..5fa3a31 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -82,6 +82,7 @@ public class TopicQueueMappingManager extends ConfigManager {
                 return;
             }
             if (force) {
+                //bakeup the old items
                 oldDetail.getHostedQueues().forEach( (queueId, items) -> {
                     newDetail.getHostedQueues().putIfAbsent(queueId, items);
                 });
@@ -90,17 +91,21 @@ public class TopicQueueMappingManager extends ConfigManager {
                 return;
             }
             //do more check
-            if (newDetail.getEpoch() <= oldDetail.getEpoch()) {
+            if (newDetail.getEpoch() < oldDetail.getEpoch()) {
                 throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
             }
+            boolean epochEqual = newDetail.getEpoch() == oldDetail.getEpoch();
             for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
                 List<LogicQueueMappingItem> oldItems = oldDetail.getHostedQueues().get(globalId);
                 List<LogicQueueMappingItem> newItems = newDetail.getHostedQueues().get(globalId);
                 if (newItems == null) {
-                    //keep the old
-                    newDetail.getHostedQueues().put(globalId, oldItems);
+                    if (epochEqual) {
+                        throw new RuntimeException("Cannot accept equal epoch with null data");
+                    } else {
+                        newDetail.getHostedQueues().put(globalId, oldItems);
+                    }
                 } else {
-                    TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems);
+                    TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual);
                 }
             }
             topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 5944155..77add20 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
+import com.alibaba.fastjson.JSON;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 245cc3a..4e00bc3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -1,5 +1,6 @@
 package org.apache.rocketmq.common.rpc;
 
+import com.alibaba.fastjson.JSON;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import io.netty.util.concurrent.Promise;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -65,6 +66,15 @@ public class RpcClientImpl implements RpcClient {
                 case RequestCode.PULL_MESSAGE:
                     rpcResponsePromise = handlePullMessage(addr, request, timeoutMs);
                     break;
+                case RequestCode.GET_MIN_OFFSET:
+                    rpcResponsePromise = handleGetMinOffset(addr, request, timeoutMs);
+                    break;
+                case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
+                    rpcResponsePromise = handleSearchOffset(addr, request, timeoutMs);
+                    break;
+                case RequestCode.GET_EARLIEST_MSG_STORETIME:
+                    rpcResponsePromise = handleGetEarliestMsgStoretime(addr, request, timeoutMs);
+                    break;
                 default:
                     throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
             }
@@ -146,8 +156,9 @@ public class RpcClientImpl implements RpcClient {
         return rpcResponsePromise;
     }
 
-    public RpcResponse handleSearchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = getBrokerAddrByNameOrException(bname);
+    public Promise<RpcResponse> handleSearchOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
         RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
         RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
         assert responseCommand != null;
@@ -155,17 +166,18 @@ public class RpcClientImpl implements RpcClient {
             case ResponseCode.SUCCESS: {
                 SearchOffsetResponseHeader responseHeader =
                         (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
-                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+                rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+                break;
             }
             default:{
-                RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
-                return rpcResponse;
+                rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
             }
         }
+        return rpcResponsePromise;
     }
 
-    public RpcResponse handleGetMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = getBrokerAddrByNameOrException(bname);
+    public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
 
         RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
 
@@ -175,17 +187,18 @@ public class RpcClientImpl implements RpcClient {
             case ResponseCode.SUCCESS: {
                 GetMinOffsetResponseHeader responseHeader =
                         (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
-                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+                rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+                break;
             }
             default:{
-                RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
-                return rpcResponse;
+                rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
             }
         }
+        return rpcResponsePromise;
     }
 
-    public RpcResponse handleGetEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = getBrokerAddrByNameOrException(bname);
+    public Promise<RpcResponse> handleGetEarliestMsgStoretime(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
 
         RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
 
@@ -195,14 +208,14 @@ public class RpcClientImpl implements RpcClient {
             case ResponseCode.SUCCESS: {
                 GetEarliestMsgStoretimeResponseHeader responseHeader =
                         (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
-                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
-
+                rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+                break;
             }
             default:{
-                RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
-                return rpcResponse;
+                rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
             }
         }
+        return rpcResponsePromise;
     }
 
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
index ebae12b..61dce64 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
@@ -21,6 +21,9 @@ public class RpcClientUtils {
     }
 
     public static byte[] encodeBody(Object body) {
+        if (body == null) {
+            return null;
+        }
         if (body instanceof byte[]) {
             return (byte[])body;
         } else if (body instanceof RemotingSerializable) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
index d6d359d..4a788ab 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
@@ -35,6 +35,7 @@ public class TopicQueueMappingContext  {
         this.mappingDetail = mappingDetail;
         this.mappingItemList = mappingItemList;
         this.mappingItem = mappingItem;
+
     }
 
     public boolean checkIfAsPhysical() {
@@ -43,6 +44,37 @@ public class TopicQueueMappingContext  {
                 || (mappingItemList.size() == 1 &&  mappingItemList.get(0).getLogicOffset() == 0);
     }
 
+    public boolean isLeader() {
+        if (mappingDetail == null
+                || mappingItemList == null
+                || mappingItemList.isEmpty()) {
+            return false;
+        }
+        LogicQueueMappingItem mappingItem = mappingItemList.get(mappingItemList.size() - 1);
+        return mappingItem.getBname().equals(mappingDetail.getBname());
+    }
+
+    public LogicQueueMappingItem findNext() {
+        if (mappingDetail == null
+                || mappingItem == null
+                || mappingItemList == null
+                || mappingItemList.isEmpty()) {
+            return null;
+        }
+        for (int i = 0; i < mappingItemList.size(); i++) {
+            LogicQueueMappingItem item = mappingItemList.get(i);
+            if (item.getGen() == mappingItem.getGen()) {
+                if (i < mappingItemList.size() - 1) {
+                    return mappingItemList.get(i + 1);
+                } else {
+                    return null;
+                }
+            }
+        }
+        return null;
+    }
+
+
     public String getTopic() {
         return topic;
     }
@@ -90,4 +122,6 @@ public class TopicQueueMappingContext  {
     public void setMappingItem(LogicQueueMappingItem mappingItem) {
         this.mappingItem = mappingItem;
     }
+
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 86a6cec..0659572 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -85,7 +85,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
         //Could use bi-search to polish performance
         for (int i = mappingItems.size() - 1; i >= 0; i--) {
             LogicQueueMappingItem item =  mappingItems.get(i);
-            if (logicOffset >= item.getLogicOffset()) {
+            if (item.getLogicOffset() >= 0
+                    && logicOffset >= item.getLogicOffset()) {
                 return item;
             }
         }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index ef565a0..5527974 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -36,6 +36,8 @@ import java.util.Set;
 
 public class TopicQueueMappingUtils {
 
+    public static final int DEFAULT_BLOCK_SEQ_SIZE = 10000;
+
     public static class MappingAllocator {
         Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
         Map<Integer, String> idToBroker = new HashMap<Integer, String>();
@@ -191,7 +193,7 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
     }
 
-    public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems) {
+    public static void makeSureLogicQueueMappingItemImmutable(List<LogicQueueMappingItem> oldItems, List<LogicQueueMappingItem> newItems, boolean epochEqual) {
         if (oldItems == null || oldItems.isEmpty()) {
             return;
         }
@@ -218,6 +220,16 @@ public class TopicQueueMappingUtils {
                 inew++;
             }
         }
+        if (epochEqual) {
+            LogicQueueMappingItem oldLeader = oldItems.get(oldItems.size() - 1);
+            LogicQueueMappingItem newLeader = newItems.get(newItems.size() - 1);
+            if (newLeader.getGen() != oldLeader.getGen()
+                || !newLeader.getBname().equals(oldLeader.getBname())
+                || newLeader.getQueueId() != oldLeader.getQueueId()
+                || newLeader.getStartOffset() != oldLeader.getStartOffset()) {
+                throw new RuntimeException("The new leader is different but epoch equal");
+            }
+        }
     }
 
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
index ce7558f..20b8ad2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
@@ -69,6 +69,10 @@ public class PullSysFlag {
         return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
     }
 
+    public static int clearSuspendFlag(final int sysFlag) {
+        return sysFlag & (~FLAG_SUSPEND);
+    }
+
     public static boolean hasSubscriptionFlag(final int sysFlag) {
         return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
     }
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
index 9b9ab6b..b4a8dda 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java
@@ -4,9 +4,11 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.ImmutableList;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.experimental.theories.suppliers.TestedOn;
 
 import java.util.Map;
 
@@ -53,4 +55,9 @@ public class TopicQueueMappingTest {
             Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false));
         }
     }
+
+    @Test
+    public void test() {
+
+    }
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java
similarity index 99%
rename from common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
rename to common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java
index bd4b13c..93cad48 100644
--- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java
@@ -13,7 +13,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
-public class TopicMappingUtilsTest {
+public class TopicQueueMappingUtilsTest {
 
 
     private Set<String> buildTargetBrokers(int num) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index f895b63..0e32226 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -28,8 +28,13 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class RemotingCommand {
@@ -313,11 +318,17 @@ public class RemotingCommand {
         return objectHeader;
     }
 
-    private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
+    //make it able to test
+    Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
         Field[] field = CLASS_HASH_MAP.get(classHeader);
 
         if (field == null) {
-            field = classHeader.getDeclaredFields();
+            Set<Field> fieldList = new HashSet<Field>();
+            for (Class className = classHeader; className != Object.class; className = className.getSuperclass()) {
+                Field[] fields = className.getDeclaredFields();
+                fieldList.addAll(Arrays.asList(fields));
+            }
+            field = fieldList.toArray(new Field[0]);
             synchronized (CLASS_HASH_MAP) {
                 CLASS_HASH_MAP.put(classHeader, field);
             }
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index 2bd41ce..f2f6935 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -19,9 +19,14 @@ package org.apache.rocketmq.remoting.protocol;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.alibaba.fastjson.JSON;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -198,6 +203,28 @@ public class RemotingCommandTest {
         Field value = FieldTestClass.class.getDeclaredField("value");
         assertThat(method.invoke(remotingCommand, value)).isEqualTo(false);
     }
+
+    @Test
+    public void testParentField() throws Exception {
+        SubExtFieldsHeader subExtFieldsHeader = new SubExtFieldsHeader();
+        RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(1, subExtFieldsHeader);
+        Field[] fields  = remotingCommand.getClazzFields(subExtFieldsHeader.getClass());
+        Assert.assertEquals(7, fields.length);
+        Set<String> names = new HashSet<>();
+        names.add("stringValue");
+        names.add("intValue");
+        names.add("longValue");
+        names.add("booleanValue");
+        names.add("doubleValue");
+        names.add("name");
+        names.add("value");
+        for (Field field : fields) {
+            Assert.assertTrue(names.contains(field.getName()));
+        }
+        remotingCommand.makeCustomHeaderToNet();
+        SubExtFieldsHeader other = (SubExtFieldsHeader) remotingCommand.decodeCommandCustomHeader(subExtFieldsHeader.getClass());
+        Assert.assertEquals(other, subExtFieldsHeader);
+    }
 }
 
 class FieldTestClass {
@@ -246,4 +273,72 @@ class ExtFieldsHeader implements CommandCustomHeader {
     public double getDoubleValue() {
         return doubleValue;
     }
-}
\ No newline at end of file
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof ExtFieldsHeader)) return false;
+
+        ExtFieldsHeader that = (ExtFieldsHeader) o;
+
+        if (intValue != that.intValue) return false;
+        if (longValue != that.longValue) return false;
+        if (booleanValue != that.booleanValue) return false;
+        if (Double.compare(that.doubleValue, doubleValue) != 0) return false;
+        return stringValue != null ? stringValue.equals(that.stringValue) : that.stringValue == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result;
+        long temp;
+        result = stringValue != null ? stringValue.hashCode() : 0;
+        result = 31 * result + intValue;
+        result = 31 * result + (int) (longValue ^ (longValue >>> 32));
+        result = 31 * result + (booleanValue ? 1 : 0);
+        temp = Double.doubleToLongBits(doubleValue);
+        result = 31 * result + (int) (temp ^ (temp >>> 32));
+        return result;
+    }
+}
+
+
+class SubExtFieldsHeader extends ExtFieldsHeader {
+    private String name = "12321";
+    private int value = 111;
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getValue() {
+        return value;
+    }
+
+    public void setValue(int value) {
+        this.value = value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof SubExtFieldsHeader)) return false;
+        if (!super.equals(o)) return false;
+
+        SubExtFieldsHeader that = (SubExtFieldsHeader) o;
+
+        if (value != that.value) return false;
+        return name != null ? name.equals(that.name) : that.name == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (name != null ? name.hashCode() : 0);
+        result = 31 * result + value;
+        return result;
+    }
+}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
index ce739be..71f9088 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
@@ -49,6 +49,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
         consumer = new DefaultMQPushConsumer(consumerGroup);
         consumer.setInstanceName(RandomUtil.getStringByUUID());
         consumer.setNamesrvAddr(nsAddr);
+        consumer.setPollNameServerInterval(100);
         try {
             consumer.subscribe(topic, subExpression);
         } catch (MQClientException e) {
@@ -92,4 +93,8 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
         create();
         start();
     }
+
+    public DefaultMQPushConsumer getConsumer() {
+        return consumer;
+    }
 }
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
index 018623d..4f5d38e 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -73,6 +73,7 @@ public class RMQNormalProducer extends AbstractMQProducer {
         producer.setProducerGroup(getProducerGroupName());
         producer.setInstanceName(getProducerInstanceName());
         producer.setUseTLS(useTLS);
+        producer.setPollNameServerInterval(100);
 
         if (nsAddr != null) {
             producer.setNamesrvAddr(nsAddr);
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index e573180..53a7ab3 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -100,7 +100,10 @@ public class BaseConf {
                 List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(clusterName).getBrokerDatas();
                 return brokerDatas.size() == brokerNum;
             });
-        } catch (MQClientException e) {
+            for (BrokerController brokerController: brokerControllerList) {
+                brokerController.getBrokerOuterAPI().refreshMetadata();
+            }
+        } catch (Exception e) {
             log.error("init failed, please check BaseConf");
         }
         ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
@@ -126,6 +129,7 @@ public class BaseConf {
     public static DefaultMQAdminExt getAdmin(String nsAddr) {
         final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
         mqAdminExt.setNamesrvAddr(nsAddr);
+        mqAdminExt.setPollNameServerInterval(100);
         mqClients.add(mqAdminExt);
         return mqAdminExt;
     }
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index feb8730..03a92e8 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -2,6 +2,7 @@ package org.apache.rocketmq.test.smoke;
 
 import org.apache.log4j.Logger;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -11,6 +12,7 @@ import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.test.base.BaseConf;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -23,6 +25,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
+import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -61,7 +64,7 @@ public class StaticTopicIT extends BaseConf {
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
         Assert.assertTrue(brokerConfigMap.isEmpty());
         TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);
-        Assert.assertEquals(2, brokerConfigMap.size());
+        Assert.assertEquals(targetBrokers.size(), brokerConfigMap.size());
         //If some succeed, and others fail, it will cause inconsistent data
         for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
             String broker = entry.getKey();
@@ -72,6 +75,22 @@ public class StaticTopicIT extends BaseConf {
         return brokerConfigMap;
     }
 
+    public void remappingStaticTopic(String topic, Set<String> targetBrokers) throws Exception {
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+        Assert.assertFalse(brokerConfigMap.isEmpty());
+        TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
+        defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false);
+    }
+
+
+
+
+    @Test
+    public void testNonTargetBrokers() {
+
+    }
+
+
     @Test
     public void testCreateProduceConsumeStaticTopic() throws Exception {
         String topic = "static" + MQRandomUtils.getRandomTopic();
@@ -120,7 +139,7 @@ public class StaticTopicIT extends BaseConf {
         Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size());
         Assert.assertEquals(0, producer.getSendErrorMsg().size());
 
-        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
+        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
         assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
                 consumer.getListener().getAllMsgBody()))
                 .containsExactlyElementsIn(producer.getAllMsgBody());
@@ -149,6 +168,110 @@ public class StaticTopicIT extends BaseConf {
     }
 
 
+    @Test
+    public void testRemappingProduceConsumeStaticTopic() throws Exception {
+        String topic = "static" + MQRandomUtils.getRandomTopic();
+        RMQNormalProducer producer = getProducer(nsAddr, topic);
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
+        producer.getProducer().setPollNameServerInterval(100);
+
+        int queueNum = 10;
+        int msgEachQueue = 100;
+        //create static topic
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker1Name);
+            createStaticTopic(topic, queueNum, targetBrokers);
+        }
+        //produce the messages
+        {
+            List<MessageQueue> messageQueueList = producer.getMessageQueue();
+            for (int i = 0; i < queueNum; i++) {
+                MessageQueue messageQueue = messageQueueList.get(i);
+                Assert.assertEquals(i, messageQueue.getQueueId());
+                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+            }
+            for(MessageQueue messageQueue: messageQueueList) {
+                producer.send(msgEachQueue, messageQueue);
+            }
+            Assert.assertEquals(0, producer.getSendErrorMsg().size());
+            //leave the time to build the cq
+            Thread.sleep(100);
+            for(MessageQueue messageQueue: messageQueueList) {
+                //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+                Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
+            }
+        }
+
+        //remapping the static topic
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker2Name);
+            remappingStaticTopic(topic, targetBrokers);
+            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+
+            TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
+            Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
+            Assert.assertEquals(queueNum, globalIdMap.size());
+            for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
+                Assert.assertEquals(broker2Name, mappingOne.getBname());
+            }
+        }
+        //leave the time to refresh the metadata
+        Thread.sleep(500);
+        producer.setDebug(true);
+        {
+            List<MessageQueue> messageQueueList = producer.getMessageQueue();
+            for (int i = 0; i < queueNum; i++) {
+                MessageQueue messageQueue = messageQueueList.get(i);
+                Assert.assertEquals(i, messageQueue.getQueueId());
+                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+                String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
+                Assert.assertEquals(destBrokerName, broker2Name);
+            }
+
+            for(MessageQueue messageQueue: messageQueueList) {
+                producer.send(msgEachQueue, messageQueue);
+            }
+            Assert.assertEquals(0, producer.getSendErrorMsg().size());
+            //leave the time to build the cq
+            Thread.sleep(100);
+            for(MessageQueue messageQueue: messageQueueList) {
+                Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+                Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
+            }
+        }
+        {
+            consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
+            System.out.println("Consume: " + consumer.getListener().getAllMsgBody().size());
+            assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                    consumer.getListener().getAllMsgBody()))
+                    .containsExactlyElementsIn(producer.getAllMsgBody());
+            Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
+            for (Object object : consumer.getListener().getAllOriginMsg()) {
+                MessageExt messageExt = (MessageExt) object;
+                if (!messagesByQueue.containsKey(messageExt.getQueueId())) {
+                    messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>());
+                }
+                messagesByQueue.get(messageExt.getQueueId()).add(messageExt);
+            }
+            Assert.assertEquals(queueNum, messagesByQueue.size());
+            for (int i = 0; i < queueNum; i++) {
+                List<MessageExt> messageExts = messagesByQueue.get(i);
+                Assert.assertEquals(msgEachQueue, messageExts.size());
+                Collections.sort(messageExts, new Comparator<MessageExt>() {
+                    @Override
+                    public int compare(MessageExt o1, MessageExt o2) {
+                        return (int) (o1.getQueueOffset() - o2.getQueueOffset());
+                    }
+                });
+                for (int j = 0; j < msgEachQueue; j++) {
+                    Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
+                }
+            }
+        }
+    }
+
     @After
     public void tearDown() {
         super.shutdown();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 98694d2..07c4bf3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1112,6 +1112,15 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
             TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
             createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
         }
+        //Step5: write the non-target brokers
+        for (String broker: brokerConfigMap.keySet()) {
+            if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
+                continue;
+            }
+            String addr = clientMetadata.findMasterBrokerAddr(broker);
+            TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
+            createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
+        }
     }
 
     @Override