You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/08 09:28:17 UTC

[rocketmq] 02/02: Finish the test for topicStats

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 48db31b4815af2d2a3c07c1944a69f572e4425d7
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 8 17:27:55 2021 +0800

    Finish the test for topicStats
---
 .../broker/processor/AdminBrokerProcessor.java     |  97 ++++-
 .../broker/processor/ConsumerManageProcessor.java  |  12 +-
 .../broker/processor/PullMessageProcessor.java     |  16 +-
 .../broker/topic/TopicQueueMappingManager.java     |  34 +-
 .../header/GetTopicStatsInfoRequestHeader.java     |   5 +-
 .../apache/rocketmq/common/rpc/RequestBuilder.java |  16 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |   1 +
 .../rocketmq/common/rpc/RpcRequestHeader.java      |  32 +-
 .../common/rpc/TopicQueueRequestHeader.java        |  14 +-
 ...eRequestHeader.java => TopicRequestHeader.java} |  22 +-
 .../rocketmq/test/statictopic/StaticTopicIT.java   | 405 ++++++---------------
 11 files changed, 278 insertions(+), 376 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index cca96f1..fb4a711 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
 import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PlainAccessConfig;
@@ -104,6 +105,7 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rpc.RpcClient;
 import org.apache.rocketmq.common.rpc.RpcClientUtils;
 import org.apache.rocketmq.common.rpc.RpcException;
 import org.apache.rocketmq.common.rpc.RpcRequest;
@@ -159,9 +161,13 @@ import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorRe
 public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
+    private final RpcClient rpcClient;
+    private final BrokerConfig brokerConfig;
 
     public AdminBrokerProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
+        this.brokerConfig = brokerController.getBrokerConfig();
+        this.rpcClient = brokerController.getBrokerOuterAPI().getRpcClient();
     }
 
     @Override
@@ -650,7 +656,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                         break;
                     }
                 } else {
-                    requestHeader.setPhysical(true);
+                    requestHeader.setLo(false);
                     requestHeader.setTimestamp(timestamp);
                     requestHeader.setQueueId(item.getQueueId());
                     requestHeader.setBname(item.getBname());
@@ -720,7 +726,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             assert maxItem != null;
             assert maxItem.getLogicOffset() >= 0;
             requestHeader.setBname(maxItem.getBname());
-            requestHeader.setPhysical(true);
+            requestHeader.setLo(false);
             requestHeader.setQueueId(mappingItem.getQueueId());
 
             long maxPhysicalOffset = Long.MAX_VALUE;
@@ -770,7 +776,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
-    private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext)  {
+    private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext)  {
         if (mappingContext.getMappingDetail() == null) {
             return null;
         }
@@ -778,14 +784,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         if (!mappingContext.isLeader()) {
             //this may not
             return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
-                    String.format("%s-%d is not leader in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()))));
+                    String.format("%s-%d is not leader in broker %s, request code %d", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname(), request.getCode()))));
         };
-
+        GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
         LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
         assert  mappingItem != null;
         try {
             requestHeader.setBname(mappingItem.getBname());
-            requestHeader.setPhysical(true);
+            requestHeader.setLo(false);
             requestHeader.setQueueId(mappingItem.getQueueId());
             long physicalOffset;
             //run in local
@@ -815,7 +821,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         assert request.getCode() == RequestCode.GET_MIN_OFFSET;
         GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
         TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
-        CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(requestHeader, mappingContext);
+        CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(request, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
         }
@@ -851,7 +857,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         assert mappingItem != null;
         try {
             requestHeader.setBname(mappingItem.getBname());
-            requestHeader.setPhysical(true);
+            requestHeader.setLo(false);
             RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
             //TODO check if it is in current broker
             RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
@@ -1006,6 +1012,70 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
+    private RpcResponse handleGetTopicStatsInfoForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) {
+        try {
+            assert request.getCode() == RequestCode.GET_TOPIC_STATS_INFO;
+            if (mappingContext.getMappingDetail() == null) {
+                return null;
+            }
+            final GetTopicStatsInfoRequestHeader requestHeader = (GetTopicStatsInfoRequestHeader) request.getHeader();
+            String topic = requestHeader.getTopic();
+            TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+            Map<Integer, LogicQueueMappingItem[]> qidItemMap = new HashMap<>();
+            Set<String> brokers = new HashSet<>();
+            mappingDetail.getHostedQueues().forEach((qid, items) -> {
+                if (TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
+                    LogicQueueMappingItem[] itemPair = new LogicQueueMappingItem[2];
+                    itemPair[0] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, 0, true);
+                    itemPair[1] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, Long.MAX_VALUE, true);
+                    assert itemPair[0] != null && itemPair[1] != null;
+                    qidItemMap.put(qid, itemPair);
+                    brokers.add(itemPair[0].getBname());
+                    brokers.add(itemPair[1].getBname());
+                }
+            });
+            Map<String, TopicStatsTable> statsTable = new HashMap<>();
+            for (String broker: brokers) {
+                GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+                header.setTopic(topic);
+                header.setBname(broker);
+                header.setLo(false);
+                RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+                RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
+                if (rpcResponse.getException() != null) {
+                    throw rpcResponse.getException();
+                }
+                statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+            }
+            TopicStatsTable topicStatsTable = new TopicStatsTable();
+            qidItemMap.forEach( (qid, itemPair) -> {
+                LogicQueueMappingItem minItem = itemPair[0];
+                LogicQueueMappingItem maxItem = itemPair[1];
+                TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId()));
+                TopicOffset maxTopicOffset = statsTable.get(maxItem.getBname()).getOffsetTable().get(new MessageQueue(topic, maxItem.getBname(), maxItem.getQueueId()));
+
+                assert  minTopicOffset != null && maxTopicOffset != null;
+
+                long min = minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset());
+                if (min < 0)
+                    min = 0;
+                long max = maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset());
+                if (max < 0)
+                    max = 0;
+                long timestamp = maxTopicOffset.getLastUpdateTimestamp();
+
+                TopicOffset topicOffset = new TopicOffset();
+                topicOffset.setMinOffset(min);
+                topicOffset.setMaxOffset(max);
+                topicOffset.setLastUpdateTimestamp(timestamp);
+                topicStatsTable.getOffsetTable().put(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, qid), topicOffset);
+            });
+            return new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable);
+        } catch (Throwable t) {
+            return new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t));
+        }
+    }
+
     private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -1019,8 +1089,17 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             response.setRemark("topic[" + topic + "] not exist");
             return response;
         }
-
         TopicStatsTable topicStatsTable = new TopicStatsTable();
+        TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
+        RpcResponse rpcResponse = handleGetTopicStatsInfoForStaticTopic(new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, requestHeader, null), mappingContext);
+        if (rpcResponse != null) {
+            if (rpcResponse.getException() != null) {
+                return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
+            } else {
+                topicStatsTable.getOffsetTable().putAll(((TopicStatsTable)rpcResponse.getBody()).getOffsetTable());
+            }
+        }
+
         for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
             MessageQueue mq = new MessageQueue();
             mq.setTopic(topic);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 0443396..66abe62 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -153,11 +153,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             //by default, it is -1
             long offset = -1;
             //double read, first from leader, then from second leader
-            for (int i = 1; i <= 2; i++) {
-                if (itemList.size() - i < 0) {
-                    break;
-                }
-                LogicQueueMappingItem mappingItem = itemList.get(itemList.size() - i);
+            for (int i = itemList.size() - 1; i >= 0; i--) {
+                LogicQueueMappingItem mappingItem = itemList.get(i);
                 if (mappingItem.getBname().equals(mappingDetail.getBname())) {
                     offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
                     if (offset >= 0) {
@@ -170,7 +167,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                     //maybe we need to reconstruct an object
                     requestHeader.setBname(mappingItem.getBname());
                     requestHeader.setQueueId(mappingItem.getQueueId());
-                    requestHeader.setPhysical(true);
+                    requestHeader.setLo(false);
                     requestHeader.setSetZeroIfNotFound(false);
                     RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
                     RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
@@ -179,7 +176,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                     }
                     if (rpcResponse.getCode() == ResponseCode.SUCCESS) {
                         offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset();
-                    } else if (rpcResponse.getCode() == ResponseCode.PULL_NOT_FOUND){
+                        break;
+                    } else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND){
                         continue;
                     } else {
                         //this should not happen
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 9b8135e..0f1ca23 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -134,18 +134,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                     && requestHeader.getMaxMsgNums() != null) {
                 requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
             }
-            int sysFlag = requestHeader.getSysFlag();
-            if (!mappingContext.isLeader()) {
-                sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
-                requestHeader.setSysFlag(sysFlag);
-            }
 
             if (mappingDetail.getBname().equals(bname)) {
                 //just let it go, do the local pull process
                 return null;
             }
 
-            requestHeader.setPhysical(true);
+            int sysFlag = requestHeader.getSysFlag();
+            requestHeader.setLo(false);
             requestHeader.setBname(bname);
             sysFlag = PullSysFlag.clearSuspendFlag(sysFlag);
             sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
@@ -189,11 +185,13 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
             long minOffset = responseHeader.getMinOffset();
             long maxOffset = responseHeader.getMaxOffset();
             int responseCode = code;
+
             //consider the following situations
             // 1. read from slave, currently not supported
             // 2. the middle queue is truncated because of deleting commitlog
             if (code != ResponseCode.SUCCESS) {
                 //note the currentItem maybe both the leader and  the earliest
+                boolean isRevised = false;
                 if (leaderItem.getGen() == currentItem.getGen()) {
                     //read the leader
                     if (requestOffset > maxOffset) {
@@ -228,6 +226,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                         //just move to another item
                         LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
                         if (nextItem != null) {
+                            isRevised = true;
                             currentItem = nextItem;
                             nextBeginOffset = currentItem.getStartOffset();
                             minOffset = currentItem.getStartOffset();
@@ -244,7 +243,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                 }
 
                 //read from the middle item, ignore the PULL_OFFSET_MOVED
-                if (leaderItem.getGen() != currentItem.getGen()
+                if (!isRevised
+                    && leaderItem.getGen() != currentItem.getGen()
                     && earlistItem.getGen() != currentItem.getGen()) {
                     if (requestOffset < minOffset) {
                         nextBeginOffset = minOffset;
@@ -289,7 +289,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                 return null;
             }
         } catch (Throwable t) {
-            t.printStackTrace();
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
     }
@@ -443,6 +442,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
             return response;
         }
 
+
         MessageFilter messageFilter;
         if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
             messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index e76d25e..6965053 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.common.rpc.RpcRequest;
 import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicRequestHeader;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
@@ -190,28 +191,37 @@ public class TopicQueueMappingManager extends ConfigManager {
         return dataVersion;
     }
 
-    public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) {
+    public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader) {
         return buildTopicQueueMappingContext(requestHeader, false);
     }
 
     //Do not return a null context
-    public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss) {
-        if (requestHeader.getPhysical() != null
-                && Boolean.TRUE.equals(requestHeader.getPhysical())) {
-            return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
+    public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader, boolean selectOneWhenMiss) {
+        //should disable logic queue explicitly, otherwise the old client may cause dirty data to newly created static topic
+        if (requestHeader.getLo() != null
+                && Boolean.FALSE.equals(requestHeader.getLo())) {
+            return new TopicQueueMappingContext(requestHeader.getTopic(), null, null, null, null);
         }
-        TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic());
+        String topic = requestHeader.getTopic();
+        Integer globalId = null;
+        if (requestHeader instanceof  TopicQueueRequestHeader) {
+            globalId = ((TopicQueueRequestHeader) requestHeader).getQueueId();
+        }
+
+        TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(topic);
         if (mappingDetail == null) {
             //it is not static topic
-            return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
+            return new TopicQueueMappingContext(topic, null, null, null, null);
         }
-
         assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName());
 
+        if (globalId == null) {
+            return new TopicQueueMappingContext(topic, null, mappingDetail, null, null);
+        }
+
         //If not find mappingItem, it encounters some errors
-        Integer globalId = requestHeader.getQueueId();
         if (globalId < 0 && !selectOneWhenMiss) {
-            return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null);
+            return new TopicQueueMappingContext(topic, globalId, mappingDetail, null, null);
         }
 
         if (globalId < 0) {
@@ -224,7 +234,7 @@ public class TopicQueueMappingManager extends ConfigManager {
             }
         }
         if (globalId < 0) {
-            return new TopicQueueMappingContext(requestHeader.getTopic(), globalId,  mappingDetail, null, null);
+            return new TopicQueueMappingContext(topic, globalId,  mappingDetail, null, null);
         }
 
         List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
@@ -233,7 +243,7 @@ public class TopicQueueMappingManager extends ConfigManager {
                 && mappingItemList.size() > 0) {
             leaderItem = mappingItemList.get(mappingItemList.size() - 1);
         }
-        return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, mappingItemList, leaderItem);
+        return new TopicQueueMappingContext(topic, globalId, mappingDetail, mappingItemList, leaderItem);
     }
 
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
index 8e921b2..f98e150 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
@@ -17,12 +17,11 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.common.rpc.RpcRequestHeader;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.TopicRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader {
+public class GetTopicStatsInfoRequestHeader extends TopicRequestHeader {
     @CFNotNull
     private String topic;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index 4b5c62b..f9478e4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -25,7 +25,7 @@ public class RequestBuilder {
         }
         try {
             RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance();
-            requestHeader.setOneway(oneway);
+            requestHeader.setOway(oneway);
             requestHeader.setBname(destBrokerName);
             return requestHeader;
         } catch (Throwable t) {
@@ -37,26 +37,26 @@ public class RequestBuilder {
         return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), null);
     }
 
-    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean physical) {
-        return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical);
+    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean logic) {
+        return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic);
     }
 
-    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean physical) {
-        return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical);
+    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean logic) {
+        return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic);
     }
 
-    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode,  Boolean oneway, String destBrokerName, String topic, int queueId, Boolean physical) {
+    public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode,  Boolean oneway, String destBrokerName, String topic, int queueId, Boolean logic) {
         Class requestHeaderClass = requestCodeMap.get(requestCode);
         if (requestHeaderClass == null) {
             throw new UnsupportedOperationException("unknown " + requestCode);
         }
         try {
             TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance();
-            requestHeader.setOneway(oneway);
+            requestHeader.setOway(oneway);
             requestHeader.setBname(destBrokerName);
             requestHeader.setTopic(topic);
             requestHeader.setQueueId(queueId);
-            requestHeader.setPhysical(physical);
+            requestHeader.setLo(logic);
             return requestHeader;
         } catch (Throwable t) {
             throw new RuntimeException(t);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index eaa22be..97879d1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -214,6 +214,7 @@ public class RpcClientImpl implements RpcClient {
             }
             case ResponseCode.QUERY_NOT_FOUND: {
                 rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null));
+                break;
             }
             default:{
                 rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
index 577865e..593df81 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
@@ -20,13 +20,13 @@ import org.apache.rocketmq.remoting.CommandCustomHeader;
 
 public abstract class RpcRequestHeader implements CommandCustomHeader {
     //the namespace name
-    protected String namespace;
+    protected String ns;
     //if the data has been namespaced
-    protected Boolean namespaced;
+    protected Boolean nsd;
     //the abstract remote addr name, usually the physical broker name
     protected String bname;
-
-    protected Boolean oneway;
+    //oneway
+    protected Boolean oway;
 
     public String getBname() {
         return bname;
@@ -36,27 +36,27 @@ public abstract class RpcRequestHeader implements CommandCustomHeader {
         this.bname = bname;
     }
 
-    public Boolean getOneway() {
-        return oneway;
+    public Boolean getOway() {
+        return oway;
     }
 
-    public void setOneway(Boolean oneway) {
-        this.oneway = oneway;
+    public void setOway(Boolean oway) {
+        this.oway = oway;
     }
 
-    public String getNamespace() {
-        return namespace;
+    public String getNs() {
+        return ns;
     }
 
-    public void setNamespace(String namespace) {
-        this.namespace = namespace;
+    public void setNs(String ns) {
+        this.ns = ns;
     }
 
-    public Boolean getNamespaced() {
-        return namespaced;
+    public Boolean getNsd() {
+        return nsd;
     }
 
-    public void setNamespaced(Boolean namespaced) {
-        this.namespaced = namespaced;
+    public void setNsd(Boolean nsd) {
+        this.nsd = nsd;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
index 5d0a151..660f046 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
@@ -16,20 +16,8 @@
  */
 package org.apache.rocketmq.common.rpc;
 
-public abstract class TopicQueueRequestHeader extends RpcRequestHeader {
-    //Physical or Logical
-    protected Boolean physical;
+public abstract class TopicQueueRequestHeader extends TopicRequestHeader {
 
-    public Boolean getPhysical() {
-        return physical;
-    }
-
-    public void setPhysical(Boolean physical) {
-        this.physical = physical;
-    }
-
-    public abstract String getTopic();
-    public abstract void setTopic(String topic);
     public abstract Integer getQueueId();
     public abstract void setQueueId(Integer queueId);
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java
similarity index 71%
copy from common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java
index 5d0a151..a70cded 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java
@@ -16,21 +16,17 @@
  */
 package org.apache.rocketmq.common.rpc;
 
-public abstract class TopicQueueRequestHeader extends RpcRequestHeader {
-    //Physical or Logical
-    protected Boolean physical;
-
-    public Boolean getPhysical() {
-        return physical;
-    }
-
-    public void setPhysical(Boolean physical) {
-        this.physical = physical;
-    }
+public abstract class TopicRequestHeader extends RpcRequestHeader {
+    //logical
+    protected Boolean lo;
 
     public abstract String getTopic();
     public abstract void setTopic(String topic);
-    public abstract Integer getQueueId();
-    public abstract void setQueueId(Integer queueId);
 
+    public Boolean getLo() {
+        return lo;
+    }
+    public void setLo(Boolean lo) {
+        this.lo = lo;
+    }
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index fdefb06..e89dc3a 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -1,9 +1,12 @@
 package org.apache.rocketmq.test.statictopic;
 
-import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.log4j.Logger;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
@@ -86,71 +89,35 @@ public class StaticTopicIT extends BaseConf {
 
     }
 
-
-    @Test
-    public void testCreateProduceConsumeStaticTopic() throws Exception {
-        String topic = "static" + MQRandomUtils.getRandomTopic();
-        RMQNormalProducer producer = getProducer(nsAddr, topic);
-        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
-
-        int queueNum = 10;
-        int msgEachQueue = 100;
-        //create static topic
-        Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
-        //check the static topic config
-        {
-            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
-            Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
-            for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet())  {
-                String broker = entry.getKey();
-                TopicConfigAndQueueMapping configMapping = entry.getValue();
-                TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker);
-                Assert.assertNotNull(localConfigMapping);
-                Assert.assertEquals(configMapping, localConfigMapping);
-            }
-            TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
-            Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
-            Assert.assertEquals(queueNum, globalIdMap.size());
-        }
-        //check the route data
+    private void sendMessagesAndCheck(RMQNormalProducer producer, Set<String> targetBrokers, String topic, int queueNum, int msgEachQueue, int gen) throws Exception {
+        ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
         List<MessageQueue> messageQueueList = producer.getMessageQueue();
         Assert.assertEquals(queueNum, messageQueueList.size());
-        producer.setDebug(true);
         for (int i = 0; i < queueNum; i++) {
             MessageQueue messageQueue = messageQueueList.get(i);
             Assert.assertEquals(topic, messageQueue.getTopic());
-            Assert.assertEquals(i, messageQueue.getQueueId());
             Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+            Assert.assertEquals(i, messageQueue.getQueueId());
+            String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
+            Assert.assertTrue(targetBrokers.contains(destBrokerName));
         }
-        //send and consume the msg
         for(MessageQueue messageQueue: messageQueueList) {
             producer.send(msgEachQueue, messageQueue);
         }
+        Assert.assertEquals(0, producer.getSendErrorMsg().size());
         //leave the time to build the cq
-        Thread.sleep(500);
+        Thread.sleep(100);
         for(MessageQueue messageQueue: messageQueueList) {
             Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-            Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
+            Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
         }
-        Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size());
-        Assert.assertEquals(0, producer.getSendErrorMsg().size());
-
-        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
-        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
-                consumer.getListener().getAllMsgBody()))
-                .containsExactlyElementsIn(producer.getAllMsgBody());
-        Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
-        Assert.assertEquals(queueNum, messagesByQueue.size());
-        for (int i = 0; i < queueNum; i++) {
-            List<MessageExt> messageExts = messagesByQueue.get(i);
-            Assert.assertEquals(msgEachQueue, messageExts.size());
-            for (int j = 0; j < msgEachQueue; j++) {
-                Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
-            }
+        TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic);
+        for(MessageQueue messageQueue: messageQueueList) {
+            Assert.assertEquals(0, topicStatsTable.getOffsetTable().get(messageQueue).getMinOffset());
+            Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, topicStatsTable.getOffsetTable().get(messageQueue).getMaxOffset());
         }
     }
 
-
     private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> msgs) {
         Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
         for (Object object : msgs) {
@@ -171,217 +138,133 @@ public class StaticTopicIT extends BaseConf {
         return messagesByQueue;
     }
 
-    @Test
-    public void testDoubleReadCheckConsumerOffset() throws Exception {
-        String topic = "static" + MQRandomUtils.getRandomTopic();
-        String group = initConsumerGroup();
-        RMQNormalProducer producer = getProducer(nsAddr, topic);
+    private void consumeMessagesAndCheck(RMQNormalProducer producer, RMQNormalConsumer consumer, String topic, int queueNum, int msgEachQueue, int startGen, int genNum) {
+        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
+        /*System.out.println("produce:" + producer.getAllMsgBody().size());
+        System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());*/
 
-        RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
-
-        //System.out.printf("Group:%s\n", consumer.getConsumerGroup());
-        //System.out.printf("Topic:%s\n", topic);
-
-        int queueNum = 10;
-        int msgEachQueue = 100;
-        //create static topic
-        {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker1Name);
-            MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
-        }
-        //produce the messages
-        {
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
-        }
-
-        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
         assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
                 consumer.getListener().getAllMsgBody()))
                 .containsExactlyElementsIn(producer.getAllMsgBody());
-        producer.shutdown();
-        consumer.shutdown();
+        Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
+        Assert.assertEquals(queueNum, messagesByQueue.size());
+        for (int i = 0; i < queueNum; i++) {
+            List<MessageExt> messageExts = messagesByQueue.get(i);
+            /*for (MessageExt messageExt:messageExts) {
+                System.out.printf("%d %d\n", messageExt.getQueueId(), messageExt.getQueueOffset());
+            }*/
+            int totalEachQueue = msgEachQueue * genNum;
+            Assert.assertEquals(totalEachQueue, messageExts.size());
+            for (int j = 0; j < totalEachQueue; j++) {
+                MessageExt messageExt = messageExts.get(j);
+                int currGen = startGen + j / msgEachQueue;
+                Assert.assertEquals(topic, messageExt.getTopic());
+                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageExt.getBrokerName());
+                Assert.assertEquals(i, messageExt.getQueueId());
+                Assert.assertEquals((j % msgEachQueue) + currGen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExt.getQueueOffset());
+            }
+        }
+    }
 
-        //remapping the static topic
-        {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker2Name);
-            MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
 
-        }
-        //make the metadata
-        Thread.sleep(500);
-        //System.out.printf("Group:%s\n", consumer.getConsumerGroup());
+    @Test
+    public void testCreateProduceConsumeStaticTopic() throws Exception {
+        String topic = "static" + MQRandomUtils.getRandomTopic();
+        RMQNormalProducer producer = getProducer(nsAddr, topic);
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
 
+        int queueNum = 10;
+        int msgEachQueue = 100;
+        //create static topic
+        Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
+        //check the static topic config
         {
-            producer = getProducer(nsAddr, topic);
-            ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
-            //just refresh the metadata
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-                Assert.assertEquals(broker2Name, clientMetadata.getBrokerNameFromMessageQueue(messageQueue));
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
-            for(MessageQueue messageQueue: messageQueueList) {
-                Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-                Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
-            }
-            //leave the time to build the cq
-            Thread.sleep(100);
-        }
-        {
-            consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
-            consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 6000);
-            //System.out.printf("Consume %d\n", consumer.getListener().getAllMsgBody().size());
-            assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
-                    consumer.getListener().getAllMsgBody()))
-                    .containsExactlyElementsIn(producer.getAllMsgBody());
-
-            Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
-
-            Assert.assertEquals(queueNum, messagesByQueue.size());
-            for (int i = 0; i < queueNum; i++) {
-                List<MessageExt> messageExts = messagesByQueue.get(i);
-                Assert.assertEquals(msgEachQueue, messageExts.size());
-                for (int j = 0; j < msgEachQueue; j++) {
-                    Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExts.get(j).getQueueOffset());
-                }
+            Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+            Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
+            for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet())  {
+                String broker = entry.getKey();
+                TopicConfigAndQueueMapping configMapping = entry.getValue();
+                TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker);
+                Assert.assertNotNull(localConfigMapping);
+                Assert.assertEquals(configMapping, localConfigMapping);
             }
+            TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
+            Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
+            Assert.assertEquals(queueNum, globalIdMap.size());
         }
+        //send and check
+        sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 0);
+        //consume and check
+        consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
     }
 
+
     @Test
     public void testRemappingProduceConsumeStaticTopic() throws Exception {
         String topic = "static" + MQRandomUtils.getRandomTopic();
         RMQNormalProducer producer = getProducer(nsAddr, topic);
         RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
 
-
-        int queueNum = 10;
+        int queueNum = 1;
         int msgEachQueue = 100;
-        //create static topic
+        //create send consume
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker1Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker1Name);
             MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
+            consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
         }
-        //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
-        //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
-
-        //produce the messages
-        {
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for (int i = 0; i < queueNum; i++) {
-                MessageQueue messageQueue = messageQueueList.get(i);
-                Assert.assertEquals(i, messageQueue.getQueueId());
-                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
-            }
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            //leave the time to build the cq
-            Thread.sleep(100);
-            for(MessageQueue messageQueue: messageQueueList) {
-                //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-                Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
-            }
-        }
-
-        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
-        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
-                consumer.getListener().getAllMsgBody()))
-                .containsExactlyElementsIn(producer.getAllMsgBody());
-
+        System.out.println("=============================================================");
         //remapping the static topic
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker2Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker2Name);
             MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
             Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
-
             TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
             Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
             Assert.assertEquals(queueNum, globalIdMap.size());
             for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
                 Assert.assertEquals(broker2Name, mappingOne.getBname());
+                Assert.assertEquals(TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
             }
-        }
-        //leave the time to refresh the metadata
-        Thread.sleep(500);
-        producer.setDebug(true);
-        {
-            ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for (int i = 0; i < queueNum; i++) {
-                MessageQueue messageQueue = messageQueueList.get(i);
-                Assert.assertEquals(i, messageQueue.getQueueId());
-                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
-                String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
-                Assert.assertEquals(destBrokerName, broker2Name);
-            }
-
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            //leave the time to build the cq
-            Thread.sleep(100);
-            for(MessageQueue messageQueue: messageQueueList) {
-                Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-                Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
-            }
-        }
-        {
-            consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
-            assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
-                    consumer.getListener().getAllMsgBody()))
-                    .containsExactlyElementsIn(producer.getAllMsgBody());
-            Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
-            Assert.assertEquals(queueNum, messagesByQueue.size());
-            for (int i = 0; i < queueNum; i++) {
-                List<MessageExt> messageExts = messagesByQueue.get(i);
-                Assert.assertEquals(msgEachQueue * 2, messageExts.size());
-                for (int j = 0; j < msgEachQueue; j++) {
-                    Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
-                }
-                for (int j = msgEachQueue; j < msgEachQueue * 2; j++) {
-                    Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE - msgEachQueue, messageExts.get(j).getQueueOffset());
-                }
-            }
+            Thread.sleep(500);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1);
+            consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
         }
     }
 
 
-    public void sendMessagesAndCheck(RMQNormalProducer producer, String broker, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception {
-        ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
-        List<MessageQueue> messageQueueList = producer.getMessageQueue();
-        Assert.assertEquals(queueNum, messageQueueList.size());
-        for (int i = 0; i < queueNum; i++) {
-            MessageQueue messageQueue = messageQueueList.get(i);
-            Assert.assertEquals(i, messageQueue.getQueueId());
-            Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
-            String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
-            Assert.assertEquals(destBrokerName, broker);
-        }
+    @Test
+    public void testDoubleReadCheckConsumerOffset() throws Exception {
+        String topic = "static" + MQRandomUtils.getRandomTopic();
+        String group = initConsumerGroup();
+        RMQNormalProducer producer = getProducer(nsAddr, topic);
+        RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
 
-        for(MessageQueue messageQueue: messageQueueList) {
-            producer.send(msgEachQueue, messageQueue);
+        int queueNum = 10;
+        int msgEachQueue = 100;
+        //create static topic
+        {
+            Set<String> targetBrokers = ImmutableSet.of(broker1Name);
+            MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
+            consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
         }
-        Assert.assertEquals(0, producer.getSendErrorMsg().size());
-        //leave the time to build the cq
-        Thread.sleep(100);
-        for(MessageQueue messageQueue: messageQueueList) {
-            Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-            Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue));
+        producer.shutdown();
+        consumer.shutdown();
+        //use a new producer
+        producer = getProducer(nsAddr, topic);
+
+        List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name);
+        for (int i = 0; i < brokers.size(); i++) {
+            Set<String> targetBrokers = ImmutableSet.of(brokers.get(i));
+            MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+            //make the metadata
+            Thread.sleep(500);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, i + 1);
         }
+        consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+        consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());
     }
 
 
@@ -393,32 +276,29 @@ public class StaticTopicIT extends BaseConf {
         int msgEachQueue = 100;
         //create to broker1Name
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker1Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker1Name);
             MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
             //leave the time to refresh the metadata
             Thread.sleep(500);
-            sendMessagesAndCheck(producer, broker1Name, topic, queueNum, msgEachQueue, 0L);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
         }
 
         //remapping to broker2Name
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker2Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker2Name);
             MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
             //leave the time to refresh the metadata
             Thread.sleep(500);
-            sendMessagesAndCheck(producer, broker2Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1);
         }
 
         //remapping to broker3Name
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker3Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker3Name);
             MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
             //leave the time to refresh the metadata
             Thread.sleep(500);
-            sendMessagesAndCheck(producer, broker3Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE * 2);
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 2);
         }
 
         // 1 -> 2 -> 3, currently 1 should not has any mappings
@@ -469,10 +349,7 @@ public class StaticTopicIT extends BaseConf {
             for (List<LogicQueueMappingItem> items : config3.getMappingDetail().getHostedQueues().values()) {
                 Assert.assertEquals(1, items.size());
             }
-
         }
-
-
     }
 
 
@@ -482,42 +359,18 @@ public class StaticTopicIT extends BaseConf {
         RMQNormalProducer producer = getProducer(nsAddr, topic);
         int queueNum = 10;
         int msgEachQueue = 100;
-        //create static topic
+        //create and send
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker1Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker1Name);
             MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
-        }
-        //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
-        //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
-
-        //produce the messages
-        {
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for (int i = 0; i < queueNum; i++) {
-                MessageQueue messageQueue = messageQueueList.get(i);
-                Assert.assertEquals(i, messageQueue.getQueueId());
-                Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
-            }
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            //leave the time to build the cq
-            Thread.sleep(100);
-            for(MessageQueue messageQueue: messageQueueList) {
-                //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-                Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
-            }
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
         }
 
         //remapping the static topic with -1 logic offset
         {
-            Set<String> targetBrokers = new HashSet<>();
-            targetBrokers.add(broker2Name);
+            Set<String> targetBrokers = ImmutableSet.of(broker2Name);
             MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt);
             Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
-
             TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
             Map<Integer, TopicQueueMappingOne>  globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
             Assert.assertEquals(queueNum, globalIdMap.size());
@@ -525,32 +378,10 @@ public class StaticTopicIT extends BaseConf {
                 Assert.assertEquals(broker2Name, mappingOne.getBname());
                 Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
             }
-        }
-        //leave the time to refresh the metadata
-        Thread.sleep(500);
-        producer.setDebug(true);
-        {
-            ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
-            List<MessageQueue> messageQueueList = producer.getMessageQueue();
-            for (int i = 0; i < queueNum; i++) {
-                MessageQueue messageQueue = messageQueueList.get(i);
-                Assert.assertEquals(i, messageQueue.getQueueId());
-                String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
-                Assert.assertEquals(destBrokerName, broker2Name);
-            }
-
-            for(MessageQueue messageQueue: messageQueueList) {
-                producer.send(msgEachQueue, messageQueue);
-            }
-            Assert.assertEquals(0, producer.getSendErrorMsg().size());
-            Assert.assertEquals(queueNum * msgEachQueue * 2, producer.getAllOriginMsg().size());
-            //leave the time to build the cq
-            Thread.sleep(100);
-            for(MessageQueue messageQueue: messageQueueList) {
-                Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
-                //the max offset should still be msgEachQueue
-                Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
-            }
+            //leave the time to refresh the metadata
+            Thread.sleep(500);
+            //here the gen should be 0
+            sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
         }
     }