You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/08 09:28:15 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated (1d7807b -> 48db31b)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a change to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


    from 1d7807b  Finish the test for topic queue mapping clean serice
     new 8b747f9  Try using the new style to handble get min offset
     new 48db31b  Finish the test for topicStats

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../broker/processor/AdminBrokerProcessor.java     | 145 ++++++--
 .../broker/processor/ConsumerManageProcessor.java  |  12 +-
 .../broker/processor/PullMessageProcessor.java     |  16 +-
 .../broker/topic/TopicQueueMappingManager.java     |  34 +-
 .../header/GetTopicStatsInfoRequestHeader.java     |   5 +-
 .../apache/rocketmq/common/rpc/RequestBuilder.java |  16 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |   1 +
 .../rocketmq/common/rpc/RpcRequestHeader.java      |  32 +-
 .../apache/rocketmq/common/rpc/RpcResponse.java    |   2 +-
 .../common/rpc/TopicQueueRequestHeader.java        |  14 +-
 ...eRequestHeader.java => TopicRequestHeader.java} |  22 +-
 .../rocketmq/test/statictopic/StaticTopicIT.java   | 405 ++++++---------------
 12 files changed, 307 insertions(+), 397 deletions(-)
 copy common/src/main/java/org/apache/rocketmq/common/rpc/{TopicQueueRequestHeader.java => TopicRequestHeader.java} (71%)

[rocketmq] 02/02: Finish the test for topicStats

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 48db31b4815af2d2a3c07c1944a69f572e4425d7
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 8 17:27:55 2021 +0800

    Finish the test for topicStats
---
 .../broker/processor/AdminBrokerProcessor.java     |  97 ++++-
 .../broker/processor/ConsumerManageProcessor.java  |  12 +-
 .../broker/processor/PullMessageProcessor.java     |  16 +-
 .../broker/topic/TopicQueueMappingManager.java     |  34 +-
 .../header/GetTopicStatsInfoRequestHeader.java     |   5 +-
 .../apache/rocketmq/common/rpc/RequestBuilder.java |  16 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |   1 +
 .../rocketmq/common/rpc/RpcRequestHeader.java      |  32 +-
 .../common/rpc/TopicQueueRequestHeader.java        |  14 +-
 ...eRequestHeader.java => TopicRequestHeader.java} |  22 +-
 .../rocketmq/test/statictopic/StaticTopicIT.java   | 405 ++++++---------------
 11 files changed, 278 insertions(+), 376 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index cca96f1..fb4a711 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
 import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PlainAccessConfig;
@@ -104,6 +105,7 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rpc.RpcClient;
 import org.apache.rocketmq.common.rpc.RpcClientUtils;
 import org.apache.rocketmq.common.rpc.RpcException;
 import org.apache.rocketmq.common.rpc.RpcRequest;
@@ -159,9 +161,13 @@ import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorRe
 public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
+    private final RpcClient rpcClient;
+    private final BrokerConfig brokerConfig;
 
     public AdminBrokerProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
+        this.brokerConfig = brokerController.getBrokerConfig();
+        this.rpcClient = brokerController.getBrokerOuterAPI().getRpcClient();
     }
 
     @Override
@@ -650,7 +656,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                         break;
                     }
                 } else {
-                    requestHeader.setPhysical(true);
+                    requestHeader.setLo(false);
                     requestHeader.setTimestamp(timestamp);
                     requestHeader.setQueueId(item.getQueueId());
                     requestHeader.setBname(item.getBname());
@@ -720,7 +726,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             assert maxItem != null;
             assert maxItem.getLogicOffset() >= 0;
             requestHeader.setBname(maxItem.getBname());
-            requestHeader.setPhysical(true);
+            requestHeader.setLo(false);
             requestHeader.setQueueId(mappingItem.getQueueId());
 
             long maxPhysicalOffset = Long.MAX_VALUE;
@@ -770,7 +776,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
-    private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext)  {
+    private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext)  {
         if (mappingContext.getMappingDetail() == null) {
             return null;
         }
@@ -778,14 +784,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         if (!mappingContext.isLeader()) {
             //this may not
             return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
-                    String.format("%s-%d is not leader in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()))));
+                    String.format("%s-%d is not leader in broker %s, request code %d", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname(), request.getCode()))));
         };
-
+        GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
         LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
         assert  mappingItem != null;
         try {
             requestHeader.setBname(mappingItem.getBname());
-            requestHeader.setPhysical(true);
+            requestHeader.setLo(false);
             requestHeader.setQueueId(mappingItem.getQueueId());
             long physicalOffset;
             //run in local
@@ -815,7 +821,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         assert request.getCode() == RequestCode.GET_MIN_OFFSET;
         GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
         TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
-        CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(requestHeader, mappingContext);
+        CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(request, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
         }
@@ -851,7 +857,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         assert mappingItem != null;
         try {
             requestHeader.setBname(mappingItem.getBname());
-            requestHeader.setPhysical(true);
+            requestHeader.setLo(false);
             RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
             //TODO check if it is in current broker
             RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
@@ -1006,6 +1012,70 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
+    private RpcResponse handleGetTopicStatsInfoForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) {
+        try {
+            assert request.getCode() == RequestCode.GET_TOPIC_STATS_INFO;
+            if (mappingContext.getMappingDetail() == null) {
+                return null;
+            }
+            final GetTopicStatsInfoRequestHeader requestHeader = (GetTopicStatsInfoRequestHeader) request.getHeader();
+            String topic = requestHeader.getTopic();
+            TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+            Map<Integer, LogicQueueMappingItem[]> qidItemMap = new HashMap<>();
+            Set<String> brokers = new HashSet<>();
+            mappingDetail.getHostedQueues().forEach((qid, items) -> {
+                if (TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
+                    LogicQueueMappingItem[] itemPair = new LogicQueueMappingItem[2];
+                    itemPair[0] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, 0, true);
+                    itemPair[1] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, Long.MAX_VALUE, true);
+                    assert itemPair[0] != null && itemPair[1] != null;
+                    qidItemMap.put(qid, itemPair);
+                    brokers.add(itemPair[0].getBname());
+                    brokers.add(itemPair[1].getBname());
+                }
+            });
+            Map<String, TopicStatsTable> statsTable = new HashMap<>();
+            for (String broker: brokers) {
+                GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+                header.setTopic(topic);
+                header.setBname(broker);
+                header.setLo(false);
+                RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+                RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
+                if (rpcResponse.getException() != null) {
+                    throw rpcResponse.getException();
+                }
+                statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+            }
+            TopicStatsTable topicStatsTable = new TopicStatsTable();
+            qidItemMap.forEach( (qid, itemPair) -> {
+                LogicQueueMappingItem minItem = itemPair[0];
+                LogicQueueMappingItem maxItem = itemPair[1];
+                TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId()));
+                TopicOffset maxTopicOffset = statsTable.get(maxItem.getBname()).getOffsetTable().get(new MessageQueue(topic, maxItem.getBname(), maxItem.getQueueId()));
+
+                assert  minTopicOffset != null && maxTopicOffset != null;
+
+                long min = minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset());
+                if (min < 0)
+                    min = 0;
+                long max = maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset());
+                if (max < 0)
+                    max = 0;
+                long timestamp = maxTopicOffset.getLastUpdateTimestamp();
+
+                TopicOffset topicOffset = new TopicOffset();
+                topicOffset.setMinOffset(min);
+                topicOffset.setMaxOffset(max);
+                topicOffset.setLastUpdateTimestamp(timestamp);
+                topicStatsTable.getOffsetTable().put(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, qid), topicOffset);
+            });
+            return new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable);
+        } catch (Throwable t) {
+            return new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t));
+        }
+    }
+
     private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -1019,8 +1089,17 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             response.setRemark("topic[" + topic + "] not exist");
             return response;
         }
-
         TopicStatsTable topicStatsTable = new TopicStatsTable();
+        TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
+        RpcResponse rpcResponse = handleGetTopicStatsInfoForStaticTopic(new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, requestHeader, null), mappingContext);
+        if (rpcResponse != null) {
+            if (rpcResponse.getException() != null) {
+                return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
+            } else {
+                topicStatsTable.getOffsetTable().putAll(((TopicStatsTable)rpcResponse.getBody()).getOffsetTable());
+            }
+        }
+
         for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
             MessageQueue mq = new MessageQueue();
             mq.setTopic(topic);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 0443396..66abe62 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -153,11 +153,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             //by default, it is -1
             long offset = -1;
             //double read, first from leader, then from second leader
-            for (int i = 1; i <= 2; i++) {
-                if (itemList.size() - i < 0) {
-                    break;
-                }
-                LogicQueueMappingItem mappingItem = itemList.get(itemList.size() - i);
+            for (int i = itemList.size() - 1; i >= 0; i--) {
+                LogicQueueMappingItem mappingItem = itemList.get(i);
                 if (mappingItem.getBname().equals(mappingDetail.getBname())) {
                     offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
                     if (offset >= 0) {
@@ -170,7 +167,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                     //maybe we need to reconstruct an object
                     requestHeader.setBname(mappingItem.getBname());
                     requestHeader.setQueueId(mappingItem.getQueueId());
-                    requestHeader.setPhysical(true);
+                    requestHeader.setLo(false);
                     requestHeader.setSetZeroIfNotFound(false);
                     RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
                     RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
@@ -179,7 +176,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                     }
                     if (rpcResponse.getCode() == ResponseCode.SUCCESS) {
                         offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset();
-                    } else if (rpcResponse.getCode() == ResponseCode.PULL_NOT_FOUND){
+                        break;
+                    } else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND){
                         continue;
                     } else {
                         //this should not happen
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 9b8135e..0f1ca23 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -134,18 +134,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                     && requestHeader.getMaxMsgNums() != null) {
                 requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
             }
-            int sysFlag = requestHeader.getSysFlag();
-            if (!mappingContext.isLeader()) {
-                sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
-                requestHeader.setSysFlag(sysFlag);
-            }
 
             if (mappingDetail.getBname().equals(bname)) {
                 //just let it go, do the local pull process
                 return null;
             }
 
-            requestHeader.setPhysical(true);
+            int sysFlag = requestHeader.getSysFlag();
+            requestHeader.setLo(false);
             requestHeader.setBname(bname);
             sysFlag = PullSysFlag.clearSuspendFlag(sysFlag);
             sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
@@ -189,11 +185,13 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
             long minOffset = responseHeader.getMinOffset();
             long maxOffset = responseHeader.getMaxOffset();
             int responseCode = code;
+
             //consider the following situations
             // 1. read from slave, currently not supported
             // 2. the middle queue is truncated because of deleting commitlog
             if (code != ResponseCode.SUCCESS) {
                 //note the currentItem maybe both the leader and  the earliest
+                boolean isRevised = false;
                 if (leaderItem.getGen() == currentItem.getGen()) {
                     //read the leader
                     if (requestOffset > maxOffset) {
@@ -228,6 +226,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                         //just move to another item
                         LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
                         if (nextItem != null) {
+                            isRevised = true;
                             currentItem = nextItem;
                             nextBeginOffset = currentItem.getStartOffset();
                             minOffset = currentItem.getStartOffset();
@@ -244,7 +243,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                 }
 
                 //read from the middle item, ignore the PULL_OFFSET_MOVED
-                if (leaderItem.getGen() != currentItem.getGen()
+                if (!isRevised
+                    && leaderItem.getGen() != currentItem.getGen()
                     && earlistItem.getGen() != currentItem.getGen()) {
                     if (requestOffset < minOffset) {
                         nextBeginOffset = minOffset;
@@ -289,7 +289,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                 return null;
             }
         } catch (Throwable t) {
-            t.printStackTrace();
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
     }
@@ -443,6 +442,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
             return response;
         }
 
+
         MessageFilter messageFilter;
         if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
             messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index e76d25e..6965053 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.common.rpc.RpcRequest;
 import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicRequestHeader;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
@@ -190,28 +191,37 @@ public class TopicQueueMappingManager extends ConfigManager {
         return dataVersion;
     }
 
-    public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) {
+    public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader) {
         return buildTopicQueueMappingContext(requestHeader, false);
     }
 
     //Do not return a null context
-    public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss) {
-        if (requestHeader.getPhysical() != null
-                && Boolean.TRUE.equals(requestHeader.getPhysical())) {
-            return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
+    public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader, boolean selectOneWhenMiss) {
+        //should disable logic queue explicitly, otherwise the old client may cause dirty data to newly created static topic
+        if (requestHeader.getLo() != null
+                && Boolean.FALSE.equals(requestHeader.getLo())) {
+            return new TopicQueueMappingContext(requestHeader.getTopic(), null, null, null, null);
         }
-        TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic());
+        String topic = requestHeader.getTopic();
+        Integer globalId = null;
+        if (requestHeader instanceof  TopicQueueRequestHeader) {
+            globalId = ((TopicQueueRequestHeader) requestHeader).getQueueId();
+        }
+
+        TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(topic);
         if (mappingDetail == null) {
             //it is not static topic
-            return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
+            return new TopicQueueMappingContext(topic, null, null, null, null);
         }
-
         assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName());
 
+        if (globalId == null) {
+            return new TopicQueueMappingContext(topic, null, mappingDetail, null, null);
+        }
+
         //If not find mappingItem, it encounters some errors
-        Integer globalId = requestHeader.getQueueId();
         if (globalId < 0 && !selectOneWhenMiss) {
-            return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null);
+            return new TopicQueueMappingContext(topic, globalId, mappingDetail, null, null);
         }
 
         if (globalId < 0) {
@@ -224,7 +234,7 @@ public class TopicQueueMappingManager extends ConfigManager {
             }
         }
         if (globalId < 0) {
-            return new TopicQueueMappingContext(requestHeader.getTopic(), globalId,  mappingDetail, null, null);
+            return new TopicQueueMappingContext(topic, globalId,  mappingDetail, null, null);
         }
 
         List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
@@ -233,7 +243,7 @@ public class TopicQueueMappingManager extends ConfigManager {
                 && mappingItemList.size() > 0) {
             leaderItem = mappingItemList.get(mappingItemList.size() - 1);
         }
-        return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, mappingItemList, leaderItem);
+        return new TopicQueueMappingContext(topic, globalId, mappingDetail, mappingItemList, leaderItem);
     }
 
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
index 8e921b2..f98e150 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
@@ -17,12 +17,11 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.common.rpc.RpcRequestHeader;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.TopicRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader {
+public class GetTopicStatsInfoRequestHeader extends TopicRequestHeader {
     @CFNotNull
     private String topic;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index 4b5c62b..f9478e4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -25,7 +25,7 @@ public class RequestBuilder {
         }
         try {
             RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance();
-            requestHeader.setOneway(oneway);
+            requestHeader.setOway(oneway);
             requestHeader.setBname(destBrokerName);
             return requestHeader;
         } catch (Throwable t) {
@@ -37,26 +37,26 @@ public class RequestBuilder {
         return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), null);
     }
 
-    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean physical) {
-        return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical);
+    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean logic) {
+        return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic);
     }
 
-    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean physical) {
-        return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical);
+    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean logic) {
+        return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic);
     }
 
-    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode,  Boolean oneway, String destBrokerName, String topic, int queueId, Boolean physical) {
+    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode,  Boolean oneway, String destBrokerName, String topic, int queueId, Boolean logic) {
         Class requestHeaderClass = requestCodeMap.get(requestCode);
         if (requestHeaderClass == null) {
             throw new UnsupportedOperationException("unknown " + requestCode);
         }
         try {
             TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance();
-            requestHeader.setOneway(oneway);
+            requestHeader.setOway(oneway);
             requestHeader.setBname(destBrokerName);
             requestHeader.setTopic(topic);
             requestHeader.setQueueId(queueId);
-            requestHeader.setPhysical(physical);
+            requestHeader.setLo(logic);
             return requestHeader;
         } catch (Throwable t) {
             throw new RuntimeException(t);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index eaa22be..97879d1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -214,6 +214,7 @@ public class RpcClientImpl implements RpcClient {
             }
             case ResponseCode.QUERY_NOT_FOUND: {
                 rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null));
+                break;
             }
             default:{
                 rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
index 577865e..593df81 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
@@ -20,13 +20,13 @@ import org.apache.rocketmq.remoting.CommandCustomHeader;
 
 public abstract class RpcRequestHeader implements CommandCustomHeader {
     //the namespace name
-    protected String namespace;
+    protected String ns;
     //if the data has been namespaced
-    protected Boolean namespaced;
+    protected Boolean nsd;
     //the abstract remote addr name, usually the physical broker name
     protected String bname;
-
-    protected Boolean oneway;
+    //oneway
+    protected Boolean oway;
 
     public String getBname() {
         return bname;
@@ -36,27 +36,27 @@ public abstract class RpcRequestHeader implements CommandCustomHeader {
         this.bname = bname;
     }
 
-    public Boolean getOneway() {
-        return oneway;
+    public Boolean getOway() {
+        return oway;
     }
 
-    public void setOneway(Boolean oneway) {
-        this.oneway = oneway;
+    public void setOway(Boolean oway) {
+        this.oway = oway;
     }
 
-    public String getNamespace() {
-        return namespace;
+    public String getNs() {
+        return ns;
     }
 
-    public void setNamespace(String namespace) {
-        this.namespace = namespace;
+    public void setNs(String ns) {
+        this.ns = ns;
     }
 
-    public Boolean getNamespaced() {
-        return namespaced;
+    public Boolean getNsd() {
+        return nsd;
     }
 
-    public void setNamespaced(Boolean namespaced) {
-        this.namespaced = namespaced;
+    public void setNsd(Boolean nsd) {
+        this.nsd = nsd;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
index 5d0a151..660f046 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
@@ -16,20 +16,8 @@
  */
 package org.apache.rocketmq.common.rpc;
 
-public abstract class TopicQueueRequestHeader extends RpcRequestHeader {
-    //Physical or Logical
-    protected Boolean physical;
+public abstract class TopicQueueRequestHeader extends TopicRequestHeader {
 
-    public Boolean getPhysical() {
-        return physical;
-    }
-
-    public void setPhysical(Boolean physical) {
-        this.physical = physical;
-    }
-
-    public abstract String getTopic();
-    public abstract void setTopic(String topic);
     public abstract Integer getQueueId();
     public abstract void setQueueId(Integer queueId);
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java
similarity index 71%
copy from common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java
index 5d0a151..a70cded 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java
@@ -16,21 +16,17 @@
  */
 package org.apache.rocketmq.common.rpc;
 
-public abstract class TopicQueueRequestHeader extends RpcRequestHeader {
-    //Physical or Logical
-    protected Boolean physical;
-
-    public Boolean getPhysical() {
-        return physical;
-    }
-
-    public void setPhysical(Boolean physical) {
-        this.physical = physical;
-    }
+public abstract class TopicRequestHeader extends RpcRequestHeader {
+    //logical
+    protected Boolean lo;
 
     public abstract String getTopic();
     public abstract void setTopic(String topic);
-    public abstract Integer getQueueId();
-    public abstract void setQueueId(Integer queueId);
 
+    public Boolean getLo() {
+        return lo;
+    }
+    public void setLo(Boolean lo) {
+        this.lo = lo;
+    }
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index fdefb06..e89dc3a 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -1,9 +1,12 @@
 package org.apache.rocketmq.test.statictopic;
 
-import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.log4j.Logger;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
@@ -86,71 +89,35 @@ public class StaticTopicIT extends BaseConf {
 
     }
 
-
-    @Test
-    public void testCreateProduceConsumeStaticTopic() throws Exception {
-        String topic = "static" + MQRandomUtils.getRandomTopic();
-        RMQNormalProducer producer = getProducer(nsAddr, topic);
-        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
-
-        int queueNum = 10;
-        int msgEachQueue = 100;
-        //create static topic
-        Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
-        //check the static topic config
-        {
-            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
-            Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
-            for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet())  {
-                String broker = entry.getKey();
-                TopicConfigAndQueueMapping configMapping = entry.getValue();
-                TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker);
-                Assert.assertNotNull(localConfigMapping);
-                Assert.assertEquals(configMapping, localConfigMapping);
-            }
-            TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
-            Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
-            Assert.assertEquals(queueNum, globalIdMap.size());
-        }
-        //check the route data
+    private void sendMessagesAndCheck(RMQNormalProducer producer, Set<String> targetBrokers, String topic, int queueNum, int msgEachQueue, int gen) throws Exception {
+        ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
         List<MessageQueue> messageQueueList = producer.getMessageQueue();
         Assert.assertEquals(queueNum, messageQueueList.size());
-        producer.setDebug(true);
         for (int i = 0; i < queueNum; i++) {
             MessageQueue messageQueue = messageQueueList.get(i);
             Assert.assertEquals(topic, messageQueue.getTopic());
-            Assert.assertEquals(i, messageQueue.getQueueId());
             Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+            Assert.assertEquals(i, messageQueue.getQueueId());
+            String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
+            Assert.assertTrue(targetBrokers.contains(destBrokerName));
         }
-        //send and consume the msg
         for(MessageQueue messageQueue: messageQueueList) {
             producer.send(msgEachQueue, messageQueue);
         }
+        Assert.assertEquals(0, producer.getSendErrorMsg().size());
         //leave the time to build the cq
-        Thread.sleep(500);
+        Thread.sleep(100);
         for(MessageQueue messageQueue: messageQueueList) {
             Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-            Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
+            Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
         }
-        Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size());
-        Assert.assertEquals(0, producer.getSendErrorMsg().size());
-
-        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
-        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
-                consumer.getListener().getAllMsgBody()))
-                .containsExactlyElementsIn(producer.getAllMsgBody());
-        Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
-        Assert.assertEquals(queueNum, messagesByQueue.size());
-        for (int i = 0; i < queueNum; i++) {
-            List<MessageExt> messageExts = messagesByQueue.get(i);
-            Assert.assertEquals(msgEachQueue, messageExts.size());
-            for (int j = 0; j < msgEachQueue; j++) {
-                Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
-            }
+        TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic);
+        for(MessageQueue messageQueue: messageQueueList) {
+            Assert.assertEquals(0, topicStatsTable.getOffsetTable().get(messageQueue).getMinOffset());
+            Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, topicStatsTable.getOffsetTable().get(messageQueue).getMaxOffset());
         }
     }
 
-
     private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> msgs) {
         Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
         for (Object object : msgs) {
@@ -171,217 +138,133 @@ public class StaticTopicIT extends BaseConf {
         return messagesByQueue;
     }
 
-    @Test
-    public void testDoubleReadCheckConsumerOffset() throws Exception {
-        String topic = "static" + MQRandomUtils.getRandomTopic();
-        String group = initConsumerGroup();
-        RMQNormalProducer producer = getProducer(nsAddr, topic);
+    private void consumeMessagesAndCheck(RMQNormalProducer producer, RMQNormalConsumer consumer, String topic, int queueNum, int msgEachQueue, int startGen, int genNum) {
+        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
+        /*System.out.println("produce:" + producer.getAllMsgBody().size());
+        System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());*/
 
-        RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
-
-        //System.out.printf("Group:%s\n", consumer.getConsumerGroup());
-        //System.out.printf("Topic:%s\n", topic);
-
-        int queueNum = 10;
-        int msgEachQueue = 100;
-        //create static topic
-        {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker1Name);
-            MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
-        }
-        //produce the messages
-        {
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
-        }
-
-        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
         assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
                 consumer.getListener().getAllMsgBody()))
                 .containsExactlyElementsIn(producer.getAllMsgBody());
-        producer.shutdown();
-        consumer.shutdown();
+        Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
+        Assert.assertEquals(queueNum, messagesByQueue.size());
+        for (int i = 0; i < queueNum; i++) {
+            List<MessageExt> messageExts = messagesByQueue.get(i);
+            /*for (MessageExt messageExt:messageExts) {
+                System.out.printf("%d %d\n", messageExt.getQueueId(), messageExt.getQueueOffset());
+            }*/
+            int totalEachQueue = msgEachQueue * genNum;
+            Assert.assertEquals(totalEachQueue, messageExts.size());
+            for (int j = 0; j < totalEachQueue; j++) {
+                MessageExt messageExt = messageExts.get(j);
+                int currGen = startGen + j / msgEachQueue;
+                Assert.assertEquals(topic, messageExt.getTopic());
+                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageExt.getBrokerName());
+                Assert.assertEquals(i, messageExt.getQueueId());
+                Assert.assertEquals((j % msgEachQueue) + currGen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExt.getQueueOffset());
+            }
+        }
+    }
 
-        //remapping the static topic
-        {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker2Name);
-            MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
 
-        }
-        //make the metadata
-        Thread.sleep(500);
-        //System.out.printf("Group:%s\n", consumer.getConsumerGroup());
+    @Test
+    public void testCreateProduceConsumeStaticTopic() throws Exception {
+        String topic = "static" + MQRandomUtils.getRandomTopic();
+        RMQNormalProducer producer = getProducer(nsAddr, topic);
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
 
+        int queueNum = 10;
+        int msgEachQueue = 100;
+        //create static topic
+        Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
+        //check the static topic config
         {
-            producer = getProducer(nsAddr, topic);
-            ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
-            //just refresh the metadata
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-                Assert.assertEquals(broker2Name, clientMetadata.getBrokerNameFromMessageQueue(messageQueue));
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
-            for(MessageQueue messageQueue: messageQueueList) {
-                Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-                Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
-            }
-            //leave the time to build the cq
-            Thread.sleep(100);
-        }
-        {
-            consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
-            consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 6000);
-            //System.out.printf("Consume %d\n", consumer.getListener().getAllMsgBody().size());
-            assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
-                    consumer.getListener().getAllMsgBody()))
-                    .containsExactlyElementsIn(producer.getAllMsgBody());
-
-            Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
-
-            Assert.assertEquals(queueNum, messagesByQueue.size());
-            for (int i = 0; i < queueNum; i++) {
-                List<MessageExt> messageExts = messagesByQueue.get(i);
-                Assert.assertEquals(msgEachQueue, messageExts.size());
-                for (int j = 0; j < msgEachQueue; j++) {
-                    Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExts.get(j).getQueueOffset());
-                }
+            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+            Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
+            for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet())  {
+                String broker = entry.getKey();
+                TopicConfigAndQueueMapping configMapping = entry.getValue();
+                TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker);
+                Assert.assertNotNull(localConfigMapping);
+                Assert.assertEquals(configMapping, localConfigMapping);
             }
+            TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
+            Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
+            Assert.assertEquals(queueNum, globalIdMap.size());
         }
+        //send and check
+        sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 0);
+        //consume and check
+        consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
     }
 
+
     @Test
     public void testRemappingProduceConsumeStaticTopic() throws Exception {
         String topic = "static" + MQRandomUtils.getRandomTopic();
         RMQNormalProducer producer = getProducer(nsAddr, topic);
         RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
 
-
-        int queueNum = 10;
+        int queueNum = 1;
         int msgEachQueue = 100;
-        //create static topic
+        //create send consume
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker1Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker1Name);
             MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
+            consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
         }
-        //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
-        //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
-
-        //produce the messages
-        {
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for (int i = 0; i < queueNum; i++) {
-                MessageQueue messageQueue = messageQueueList.get(i);
-                Assert.assertEquals(i, messageQueue.getQueueId());
-                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
-            }
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            //leave the time to build the cq
-            Thread.sleep(100);
-            for(MessageQueue messageQueue: messageQueueList) {
-                //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-                Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
-            }
-        }
-
-        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
-        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
-                consumer.getListener().getAllMsgBody()))
-                .containsExactlyElementsIn(producer.getAllMsgBody());
-
+        System.out.println("=============================================================");
         //remapping the static topic
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker2Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker2Name);
             MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
             Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
-
             TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
             Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
             Assert.assertEquals(queueNum, globalIdMap.size());
             for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
                 Assert.assertEquals(broker2Name, mappingOne.getBname());
+                Assert.assertEquals(TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
             }
-        }
-        //leave the time to refresh the metadata
-        Thread.sleep(500);
-        producer.setDebug(true);
-        {
-            ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for (int i = 0; i < queueNum; i++) {
-                MessageQueue messageQueue = messageQueueList.get(i);
-                Assert.assertEquals(i, messageQueue.getQueueId());
-                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
-                String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
-                Assert.assertEquals(destBrokerName, broker2Name);
-            }
-
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            //leave the time to build the cq
-            Thread.sleep(100);
-            for(MessageQueue messageQueue: messageQueueList) {
-                Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-                Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
-            }
-        }
-        {
-            consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
-            assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
-                    consumer.getListener().getAllMsgBody()))
-                    .containsExactlyElementsIn(producer.getAllMsgBody());
-            Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
-            Assert.assertEquals(queueNum, messagesByQueue.size());
-            for (int i = 0; i < queueNum; i++) {
-                List<MessageExt> messageExts = messagesByQueue.get(i);
-                Assert.assertEquals(msgEachQueue * 2, messageExts.size());
-                for (int j = 0; j < msgEachQueue; j++) {
-                    Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
-                }
-                for (int j = msgEachQueue; j < msgEachQueue * 2; j++) {
-                    Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE - msgEachQueue, messageExts.get(j).getQueueOffset());
-                }
-            }
+            Thread.sleep(500);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1);
+            consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
         }
     }
 
 
-    public void sendMessagesAndCheck(RMQNormalProducer producer, String broker, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception {
-        ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
-        List<MessageQueue> messageQueueList = producer.getMessageQueue();
-        Assert.assertEquals(queueNum, messageQueueList.size());
-        for (int i = 0; i < queueNum; i++) {
-            MessageQueue messageQueue = messageQueueList.get(i);
-            Assert.assertEquals(i, messageQueue.getQueueId());
-            Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
-            String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
-            Assert.assertEquals(destBrokerName, broker);
-        }
+    @Test
+    public void testDoubleReadCheckConsumerOffset() throws Exception {
+        String topic = "static" + MQRandomUtils.getRandomTopic();
+        String group = initConsumerGroup();
+        RMQNormalProducer producer = getProducer(nsAddr, topic);
+        RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
 
-        for(MessageQueue messageQueue: messageQueueList) {
-            producer.send(msgEachQueue, messageQueue);
+        int queueNum = 10;
+        int msgEachQueue = 100;
+        //create static topic
+        {
+            Set<String> targetBrokers = ImmutableSet.of(broker1Name);
+            MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
+            consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
         }
-        Assert.assertEquals(0, producer.getSendErrorMsg().size());
-        //leave the time to build the cq
-        Thread.sleep(100);
-        for(MessageQueue messageQueue: messageQueueList) {
-            Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-            Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue));
+        producer.shutdown();
+        consumer.shutdown();
+        //use a new producer
+        producer = getProducer(nsAddr, topic);
+
+        List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name);
+        for (int i = 0; i < brokers.size(); i++) {
+            Set<String> targetBrokers = ImmutableSet.of(brokers.get(i));
+            MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+            //make the metadata
+            Thread.sleep(500);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, i + 1);
         }
+        consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+        consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());
     }
 
 
@@ -393,32 +276,29 @@ public class StaticTopicIT extends BaseConf {
         int msgEachQueue = 100;
         //create to broker1Name
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker1Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker1Name);
             MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
             //leave the time to refresh the metadata
             Thread.sleep(500);
-            sendMessagesAndCheck(producer, broker1Name, topic, queueNum, msgEachQueue, 0L);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
         }
 
         //remapping to broker2Name
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker2Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker2Name);
             MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
             //leave the time to refresh the metadata
             Thread.sleep(500);
-            sendMessagesAndCheck(producer, broker2Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1);
         }
 
         //remapping to broker3Name
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker3Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker3Name);
             MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
             //leave the time to refresh the metadata
             Thread.sleep(500);
-            sendMessagesAndCheck(producer, broker3Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE * 2);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 2);
         }
 
         // 1 -> 2 -> 3, currently 1 should not has any mappings
@@ -469,10 +349,7 @@ public class StaticTopicIT extends BaseConf {
             for (List<LogicQueueMappingItem> items : config3.getMappingDetail().getHostedQueues().values()) {
                 Assert.assertEquals(1, items.size());
             }
-
         }
-
-
     }
 
 
@@ -482,42 +359,18 @@ public class StaticTopicIT extends BaseConf {
         RMQNormalProducer producer = getProducer(nsAddr, topic);
         int queueNum = 10;
         int msgEachQueue = 100;
-        //create static topic
+        //create and send
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker1Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker1Name);
             MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
-        }
-        //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
-        //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
-
-        //produce the messages
-        {
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for (int i = 0; i < queueNum; i++) {
-                MessageQueue messageQueue = messageQueueList.get(i);
-                Assert.assertEquals(i, messageQueue.getQueueId());
-                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
-            }
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            //leave the time to build the cq
-            Thread.sleep(100);
-            for(MessageQueue messageQueue: messageQueueList) {
-                //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-                Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
-            }
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
         }
 
         //remapping the static topic with -1 logic offset
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker2Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker2Name);
             MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt);
             Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
-
             TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
             Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
             Assert.assertEquals(queueNum, globalIdMap.size());
@@ -525,32 +378,10 @@ public class StaticTopicIT extends BaseConf {
                 Assert.assertEquals(broker2Name, mappingOne.getBname());
                 Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
             }
-        }
-        //leave the time to refresh the metadata
-        Thread.sleep(500);
-        producer.setDebug(true);
-        {
-            ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for (int i = 0; i < queueNum; i++) {
-                MessageQueue messageQueue = messageQueueList.get(i);
-                Assert.assertEquals(i, messageQueue.getQueueId());
-                String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
-                Assert.assertEquals(destBrokerName, broker2Name);
-            }
-
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            Assert.assertEquals(queueNum * msgEachQueue * 2, producer.getAllOriginMsg().size());
-            //leave the time to build the cq
-            Thread.sleep(100);
-            for(MessageQueue messageQueue: messageQueueList) {
-                Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-                //the max offset should still be msgEachQueue
-                Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
-            }
+            //leave the time to refresh the metadata
+            Thread.sleep(500);
+            //here the gen should be 0
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
         }
     }
 

[rocketmq] 01/02: Try using the new style to handble get min offset

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 8b747f97f99bf40a6b0d6b813fb91b5f1f673c67
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 8 10:58:47 2021 +0800

    Try using the new style to handble get min offset
---
 .../broker/processor/AdminBrokerProcessor.java     | 54 +++++++++++++---------
 .../apache/rocketmq/common/rpc/RpcResponse.java    |  2 +-
 2 files changed, 32 insertions(+), 24 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 270c953..cca96f1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.CompleteFuture;
 import org.apache.rocketmq.acl.AccessValidator;
 import org.apache.rocketmq.acl.plain.PlainAccessValidator;
 import org.apache.rocketmq.broker.BrokerController;
@@ -103,6 +104,8 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rpc.RpcClientUtils;
+import org.apache.rocketmq.common.rpc.RpcException;
 import org.apache.rocketmq.common.rpc.RpcRequest;
 import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
@@ -146,7 +149,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
 
 import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
@@ -764,15 +770,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
-    private RemotingCommand rewriteRequestForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext)  {
+    private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext)  {
         if (mappingContext.getMappingDetail() == null) {
             return null;
         }
-
         TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
         if (!mappingContext.isLeader()) {
             //this may not
-            return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
+            return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
+                    String.format("%s-%d is not leader in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()))));
         };
 
         LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
@@ -796,38 +802,40 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             }
             long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset);
 
-            final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
-            final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
+            final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader();
             responseHeader.setOffset(offset);
-            response.setCode(ResponseCode.SUCCESS);
-            response.setRemark(null);
-            return response;
+            return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null));
         } catch (Throwable t) {
             log.error("rewriteRequestForStaticTopic failed", t);
-            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+            return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t)));
         }
     }
 
-    private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
-        RemotingCommand request) throws RemotingCommandException {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
-        final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
-        final GetMinOffsetRequestHeader requestHeader =
-            (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
-
-
+    private CompletableFuture<RpcResponse>  handleGetMinOffset(RpcRequest request) {
+        assert request.getCode() == RequestCode.GET_MIN_OFFSET;
+        GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
         TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
-        RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
         }
-
+        final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader();
         long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
-
         responseHeader.setOffset(offset);
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
-        return response;
+        return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null));
+    }
+
+    private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final GetMinOffsetRequestHeader requestHeader =
+            (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
+        try {
+            CompletableFuture<RpcResponse> responseFuture = handleGetMinOffset(new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null));
+            RpcResponse  rpcResponse = responseFuture.get();
+            return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+        }
     }
 
     private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader requestHeader, TopicQueueMappingContext mappingContext)  {
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index 2f61329..5155bd2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -34,7 +34,7 @@ public class RpcResponse   {
         this.body = body;
     }
 
-    RpcResponse(RpcException rpcException) {
+    public RpcResponse(RpcException rpcException) {
         this.code = rpcException.getErrorCode();
         this.exception = rpcException;
     }