You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/02 08:34:53 UTC

[rocketmq] 02/02: Finish the logic for double-read-check

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit fad17e1ecc4a8884f2a020dfcb4c271d018995c2
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 2 16:34:33 2021 +0800

    Finish the logic for double-read-check
---
 .../broker/processor/AdminBrokerProcessor.java     |   1 +
 .../broker/processor/ConsumerManageProcessor.java  |  85 +++++++++++++-
 .../broker/topic/TopicQueueMappingManager.java     |   3 +-
 .../consumer/store/RemoteBrokerOffsetStore.java    |   6 +-
 .../client/exception/OffsetNotFoundException.java  |  15 +++
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   5 +-
 .../client/impl/consumer/RebalancePushImpl.java    |   1 +
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |  29 +++++
 .../apache/rocketmq/test/smoke/StaticTopicIT.java  | 128 +++++++++++++++++----
 9 files changed, 242 insertions(+), 31 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 16bf677..33d3025 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -709,6 +709,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
 
         offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset);
 
+
         final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
         final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
         responseHeader.setOffset(offset);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 930f171..aad9454 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -20,8 +20,14 @@ import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -39,6 +45,8 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
+
 public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -111,6 +119,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             (UpdateConsumerOffsetRequestHeader) request
                 .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
         TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+
         RemotingCommand rewriteResult  = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
@@ -122,6 +131,76 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
         return response;
     }
 
+
+    public  RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
+        try {
+            if (mappingContext.getMappingDetail() == null) {
+                return null;
+            }
+            TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+            if (!mappingContext.isLeader()) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
+            }
+            if (mappingContext.checkIfAsPhysical()) {
+                //let it go
+                requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
+                return null;
+            }
+            //double read check
+            List<LogicQueueMappingItem> itemList = mappingContext.getMappingItemList();
+            //by default, it is -1
+            long offset = -1;
+            //double read, first from leader, then from second leader
+            for (int i = 1; i <= 2; i++) {
+                if (itemList.size() - i < 0) {
+                    break;
+                }
+                LogicQueueMappingItem mappingItem = itemList.get(itemList.size() - i);
+                if (mappingItem.getBname().equals(mappingDetail.getBname())) {
+                    offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
+                    if (offset >= 0) {
+                        break;
+                    } else {
+                        //not found
+                        continue;
+                    }
+                } else {
+                    //maybe we need to reconstruct an object
+                    requestHeader.setBname(mappingItem.getBname());
+                    requestHeader.setQueueId(mappingItem.getQueueId());
+                    requestHeader.setPhysical(true);
+                    RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
+                    RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+                    if (rpcResponse.getException() != null) {
+                        throw rpcResponse.getException();
+                    }
+                    if (rpcResponse.getCode() == ResponseCode.SUCCESS) {
+                        offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset();
+                    } else if (rpcResponse.getCode() == ResponseCode.PULL_NOT_FOUND){
+                        continue;
+                    } else {
+                        //this should not happen
+                        throw new RuntimeException("Unknown response code " + rpcResponse.getCode());
+                    }
+                }
+            }
+            final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+            final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
+            if (offset >= 0) {
+                responseHeader.setOffset(offset);
+                response.setCode(ResponseCode.SUCCESS);
+                response.setRemark(null);
+            } else {
+                response.setCode(ResponseCode.QUERY_NOT_FOUND);
+                response.setRemark("Not found, maybe this group consumer boot first");
+            }
+            return response;
+        } catch (Throwable t) {
+            t.printStackTrace();
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+        }
+    }
+
     private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
         throws RemotingCommandException {
         final RemotingCommand response =
@@ -132,8 +211,9 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             (QueryConsumerOffsetRequestHeader) request
                 .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
 
+
         TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
-        RemotingCommand rewriteResult  = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        RemotingCommand rewriteResult  = rewriteRequestForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
         }
@@ -152,8 +232,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                     requestHeader.getQueueId());
             if (minOffset <= 0
                 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
-                requestHeader.getTopic(), requestHeader.getQueueId(), 0)
-                && mappingContext.checkIfAsPhysical()) {
+                requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
                 responseHeader.setOffset(0L);
                 response.setCode(ResponseCode.SUCCESS);
                 response.setRemark(null);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 1dd9cbf..9be3717 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -196,9 +196,8 @@ public class TopicQueueMappingManager extends ConfigManager {
             return new TopicQueueMappingContext(requestHeader.getTopic(), globalId,  mappingDetail, null, null);
         }
 
-        List<LogicQueueMappingItem> mappingItemList = null;
+        List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
         LogicQueueMappingItem leaderItem = null;
-        mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
         if (mappingItemList != null
                 && mappingItemList.size() > 0) {
             leaderItem = mappingItemList.get(mappingItemList.size() - 1);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 7364856..91d12a0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.OffsetNotFoundException;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
@@ -94,7 +96,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
                         return brokerOffset;
                     }
                     // No offset in broker
-                    catch (MQBrokerException e) {
+                    catch (OffsetNotFoundException e) {
                         return -1;
                     }
                     //Other exceptions
@@ -108,7 +110,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
             }
         }
 
-        return -1;
+        return -3;
     }
 
     @Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java
new file mode 100644
index 0000000..c3d275f
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java
@@ -0,0 +1,15 @@
+package org.apache.rocketmq.client.exception;
+
+public class OffsetNotFoundException extends MQBrokerException {
+
+    public OffsetNotFoundException() {
+    }
+
+    public OffsetNotFoundException(int responseCode, String errorMessage) {
+        super(responseCode, errorMessage);
+    }
+
+    public OffsetNotFoundException(int responseCode, String errorMessage, String brokerAddr) {
+        super(responseCode, errorMessage, brokerAddr);
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 4feb225..7538866 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.MQRedirectException;
+import org.apache.rocketmq.client.exception.OffsetNotFoundException;
 import org.apache.rocketmq.client.hook.SendMessageContext;
 import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -1232,9 +1233,11 @@ public class MQClientAPIImpl {
             case ResponseCode.SUCCESS: {
                 QueryConsumerOffsetResponseHeader responseHeader =
                     (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
-
                 return responseHeader.getOffset();
             }
+            case ResponseCode.PULL_NOT_FOUND:{
+                throw new OffsetNotFoundException(response.getCode(), response.getRemark(), addr);
+            }
             default:
                 break;
         }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 09d1521..d28a046 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 4e00bc3..713bbf9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -9,6 +9,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
@@ -75,6 +76,9 @@ public class RpcClientImpl implements RpcClient {
                 case RequestCode.GET_EARLIEST_MSG_STORETIME:
                     rpcResponsePromise = handleGetEarliestMsgStoretime(addr, request, timeoutMs);
                     break;
+                case RequestCode.QUERY_CONSUMER_OFFSET:
+                    rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
+                    break;
                 default:
                     throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
             }
@@ -176,6 +180,31 @@ public class RpcClientImpl implements RpcClient {
         return rpcResponsePromise;
     }
 
+
+
+    public Promise<RpcResponse> handleQueryConsumerOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+        RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                QueryConsumerOffsetResponseHeader responseHeader =
+                        (QueryConsumerOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+                rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+                break;
+            }
+            case ResponseCode.QUERY_NOT_FOUND: {
+                rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null));
+            }
+            default:{
+                rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
+            }
+        }
+        return rpcResponsePromise;
+    }
+
     public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
         final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
 
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index abe1eee..c1cc60b 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -144,30 +145,123 @@ public class StaticTopicIT extends BaseConf {
         assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
                 consumer.getListener().getAllMsgBody()))
                 .containsExactlyElementsIn(producer.getAllMsgBody());
+        Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
+        Assert.assertEquals(queueNum, messagesByQueue.size());
+        for (int i = 0; i < queueNum; i++) {
+            List<MessageExt> messageExts = messagesByQueue.get(i);
+            Assert.assertEquals(msgEachQueue, messageExts.size());
+            for (int j = 0; j < msgEachQueue; j++) {
+                Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
+            }
+        }
+    }
+
+
+    private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> msgs) {
         Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
-        for (Object object : consumer.getListener().getAllOriginMsg()) {
+        for (Object object : msgs) {
             MessageExt messageExt = (MessageExt) object;
             if (!messagesByQueue.containsKey(messageExt.getQueueId())) {
                 messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>());
             }
             messagesByQueue.get(messageExt.getQueueId()).add(messageExt);
         }
-        Assert.assertEquals(queueNum, messagesByQueue.size());
-        for (int i = 0; i < queueNum; i++) {
-            List<MessageExt> messageExts = messagesByQueue.get(i);
-            Assert.assertEquals(msgEachQueue, messageExts.size());
-            Collections.sort(messageExts, new Comparator<MessageExt>() {
+        for (List<MessageExt> msgEachQueue: messagesByQueue.values()) {
+            Collections.sort(msgEachQueue, new Comparator<MessageExt>() {
                 @Override
                 public int compare(MessageExt o1, MessageExt o2) {
                     return (int) (o1.getQueueOffset() - o2.getQueueOffset());
                 }
             });
-            for (int j = 0; j < msgEachQueue; j++) {
-                Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
-            }
         }
+        return messagesByQueue;
     }
 
+    @Test
+    public void testDoubleReadCheck() throws Exception {
+        String topic = "static" + MQRandomUtils.getRandomTopic();
+        String group = initConsumerGroup();
+        RMQNormalProducer producer = getProducer(nsAddr, topic);
+
+        RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+
+        //System.out.printf("Group:%s\n", consumer.getConsumerGroup());
+        //System.out.printf("Topic:%s\n", topic);
+
+        int queueNum = 10;
+        int msgEachQueue = 100;
+        //create static topic
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker1Name);
+            createStaticTopic(topic, queueNum, targetBrokers);
+        }
+        //produce the messages
+        {
+            List<MessageQueue> messageQueueList = producer.getMessageQueue();
+            for(MessageQueue messageQueue: messageQueueList) {
+                producer.send(msgEachQueue, messageQueue);
+            }
+            Assert.assertEquals(0, producer.getSendErrorMsg().size());
+            Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
+        }
+
+        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                consumer.getListener().getAllMsgBody()))
+                .containsExactlyElementsIn(producer.getAllMsgBody());
+        producer.shutdown();
+        consumer.shutdown();
+
+        //remapping the static topic
+        {
+            Set<String> targetBrokers = new HashSet<>();
+            targetBrokers.add(broker2Name);
+            remappingStaticTopic(topic, targetBrokers);
+
+        }
+        //make the metadata
+        Thread.sleep(500);
+        //System.out.printf("Group:%s\n", consumer.getConsumerGroup());
+
+        {
+            producer = getProducer(nsAddr, topic);
+            //just refresh the metadata
+            defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+            List<MessageQueue> messageQueueList = producer.getMessageQueue();
+            for(MessageQueue messageQueue: messageQueueList) {
+                producer.send(msgEachQueue, messageQueue);
+                Assert.assertEquals(broker2Name, clientMetadata.getBrokerNameFromMessageQueue(messageQueue));
+            }
+            Assert.assertEquals(0, producer.getSendErrorMsg().size());
+            Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
+            for(MessageQueue messageQueue: messageQueueList) {
+                Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+                Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
+            }
+            //leave the time to build the cq
+            Thread.sleep(100);
+        }
+        {
+            consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+            consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 6000);
+            //System.out.printf("Consume %d\n", consumer.getListener().getAllMsgBody().size());
+            assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                    consumer.getListener().getAllMsgBody()))
+                    .containsExactlyElementsIn(producer.getAllMsgBody());
+
+            Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
+
+            Assert.assertEquals(queueNum, messagesByQueue.size());
+            for (int i = 0; i < queueNum; i++) {
+                List<MessageExt> messageExts = messagesByQueue.get(i);
+                Assert.assertEquals(msgEachQueue, messageExts.size());
+                for (int j = 0; j < msgEachQueue; j++) {
+                    Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExts.get(j).getQueueOffset());
+                }
+            }
+        }
+    }
 
     @Test
     public void testRemappingProduceConsumeStaticTopic() throws Exception {
@@ -175,6 +269,7 @@ public class StaticTopicIT extends BaseConf {
         RMQNormalProducer producer = getProducer(nsAddr, topic);
         RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
 
+
         int queueNum = 10;
         int msgEachQueue = 100;
         //create static topic
@@ -254,24 +349,11 @@ public class StaticTopicIT extends BaseConf {
             assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
                     consumer.getListener().getAllMsgBody()))
                     .containsExactlyElementsIn(producer.getAllMsgBody());
-            Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
-            for (Object object : consumer.getListener().getAllOriginMsg()) {
-                MessageExt messageExt = (MessageExt) object;
-                if (!messagesByQueue.containsKey(messageExt.getQueueId())) {
-                    messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>());
-                }
-                messagesByQueue.get(messageExt.getQueueId()).add(messageExt);
-            }
+            Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
             Assert.assertEquals(queueNum, messagesByQueue.size());
             for (int i = 0; i < queueNum; i++) {
                 List<MessageExt> messageExts = messagesByQueue.get(i);
                 Assert.assertEquals(msgEachQueue * 2, messageExts.size());
-                Collections.sort(messageExts, new Comparator<MessageExt>() {
-                    @Override
-                    public int compare(MessageExt o1, MessageExt o2) {
-                        return (int) (o1.getQueueOffset() - o2.getQueueOffset());
-                    }
-                });
                 for (int j = 0; j < msgEachQueue; j++) {
                     Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
                 }