You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/11 06:38:20 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the topic queue mapping context, and process the conext for ConsumerManager

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 32f58c7  Polish the topic queue mapping context, and process the conext for ConsumerManager
32f58c7 is described below

commit 32f58c79827f92d5f66f7e44bb38250ddb1a0585
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 11 14:38:04 2021 +0800

    Polish the topic queue mapping context, and process the conext for ConsumerManager
---
 .../broker/processor/ConsumerManageProcessor.java  | 15 +++-
 .../broker/processor/PullMessageProcessor.java     | 38 +++-------
 .../broker/processor/SendMessageProcessor.java     | 71 +++++--------------
 .../broker/topic/TopicQueueMappingManager.java     | 81 ++++++++++++++++++++++
 .../rocketmq/common/TopicQueueMappingContext.java  | 20 +++++-
 .../rocketmq/common/TopicQueueMappingDetail.java   | 30 +++-----
 .../protocol/header/PullMessageRequestHeader.java  |  8 ++-
 .../header/QueryConsumerOffsetRequestHeader.java   |  8 ++-
 .../protocol/header/SendMessageRequestHeader.java  |  8 ++-
 .../header/UpdateConsumerOffsetRequestHeader.java  |  8 ++-
 .../rocketmq/remoting/TopicQueueRequestHeader.java | 25 +++++++
 .../remoting/protocol/RemotingCommand.java         |  7 ++
 12 files changed, 206 insertions(+), 113 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 77317a6..59ea6c6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.TopicQueueMappingContext;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -109,6 +110,11 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
         final UpdateConsumerOffsetRequestHeader requestHeader =
             (UpdateConsumerOffsetRequestHeader) request
                 .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
+        TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+        RemotingCommand rewriteResult  = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        if (rewriteResult != null) {
+            return rewriteResult;
+        }
         this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
             requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
         response.setCode(ResponseCode.SUCCESS);
@@ -126,6 +132,12 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             (QueryConsumerOffsetRequestHeader) request
                 .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
 
+        TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+        RemotingCommand rewriteResult  = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        if (rewriteResult != null) {
+            return rewriteResult;
+        }
+
         long offset =
             this.brokerController.getConsumerOffsetManager().queryOffset(
                 requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
@@ -140,7 +152,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                     requestHeader.getQueueId());
             if (minOffset <= 0
                 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
-                requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
+                requestHeader.getTopic(), requestHeader.getQueueId(), 0)
+                && mappingContext.checkIfAsPhysical()) {
                 responseHeader.setOffset(0L);
                 response.setCode(ResponseCode.SUCCESS);
                 response.setRemark(null);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 8d7758a..79869b9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -80,6 +80,8 @@ import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
+import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
+
 public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -101,50 +103,26 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
     }
 
 
-    private RemotingCommand buildErrorResponse(int code, String remark) {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
-        response.setCode(code);
-        response.setRemark(remark);
-        return response;
-    }
-
-    private TopicQueueMappingContext buildTopicQueueMappingContext(PullMessageRequestHeader requestHeader) {
-        if (requestHeader.getPhysical() != null
-                && Boolean.TRUE.equals(requestHeader.getPhysical())) {
-            return null;
-        }
-        TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
-        if (mappingDetail == null) {
-            //it is not static topic
-            return null;
-        }
-        String topic = requestHeader.getTopic();
-        Integer globalId = requestHeader.getQueueId();
-        Long globalOffset = requestHeader.getQueueOffset();
-
-        LogicQueueMappingItem mappingItem = mappingDetail.findLogicQueueMappingItem(globalId, globalOffset);
-        return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem);
-    }
 
     private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
         try {
-            if (mappingContext == null) {
+            if (mappingContext.getMappingDetail() == null) {
                 return null;
             }
+            TopicQueueMappingDetail mappingDetail =  mappingContext.getMappingDetail();
             String topic = mappingContext.getTopic();
             Integer globalId = mappingContext.getGlobalId();
             Long globalOffset = mappingContext.getGlobalOffset();
 
             LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
             if (mappingItem == null) {
-                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s",
-                        topic, globalId, this.brokerController.getBrokerConfig().getBrokerName()));
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname()));
             }
 
             if (globalOffset < mappingItem.getStartOffset()) {
                 log.warn("{}-{} fetch offset {} smaller than the min mapping offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset());
                 return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED, String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in  broker %s",
-                        topic, globalId, globalOffset, mappingItem.getStartOffset(), this.brokerController.getBrokerConfig().getBrokerName()));
+                        topic, globalId, globalOffset, mappingItem.getStartOffset(), mappingDetail.getBname()));
             }
             //below are physical info
             String bname = mappingItem.getBname();
@@ -157,7 +135,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                 requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
             }
 
-            if (this.brokerController.getBrokerConfig().getBrokerName().equals(bname)) {
+            if (mappingDetail.getBname().equals(bname)) {
                 //just let it go
                 return null;
             }
@@ -278,7 +256,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
             return response;
         }
 
-        TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader);
+        TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, requestHeader.getQueueOffset());
 
         {
             RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 52508a4..baa1024 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.common.LogicQueueMappingItem;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -53,6 +54,7 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
@@ -63,6 +65,8 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
+import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
+
 public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
 
     private List<ConsumeMessageHook> consumeMessageHookList;
@@ -99,8 +103,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 if (requestHeader == null) {
                     return CompletableFuture.completedFuture(null);
                 }
-                TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader);
-                RemotingCommand rewriteResult =  rewriteRequestForStaticTopic(requestHeader, mappingContext);
+                TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true, Long.MAX_VALUE);
+                RemotingCommand rewriteResult =  this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
                 if (rewriteResult != null) {
                     return CompletableFuture.completedFuture(rewriteResult);
                 }
@@ -115,68 +119,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
     }
 
 
-
-    private RemotingCommand buildErrorResponse(int code, String remark) {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-        response.setCode(code);
-        response.setRemark(remark);
-        return response;
-    }
-
-    private TopicQueueMappingContext buildTopicQueueMappingContext(SendMessageRequestHeader requestHeader) {
-        if (requestHeader.getPhysical() != null
-                && Boolean.TRUE.equals(requestHeader.getPhysical())) {
-            return null;
-        }
-        TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
-        if (mappingDetail == null) {
-            //it is not static topic
-            return null;
-        }
-        return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, mappingDetail, null);
-    }
     /**
      * If the response is not null, it meets some errors
-     * @param requestHeader
      * @return
      */
-    private RemotingCommand rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
-        try {
-            if (mappingContext == null) {
-                return null;
-            }
-            TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
-            Integer phyQueueId = null;
-            //compatible with the old logic, but it fact, this should not happen
-            if (requestHeader.getQueueId() < 0) {
-                Iterator<Map.Entry<Integer, Integer>> it  = mappingDetail.getCurrIdMap().entrySet().iterator();
-                if (it.hasNext()) {
-                    phyQueueId = it.next().getValue();
-                }
-            } else {
-                phyQueueId = mappingDetail.getCurrIdMap().get(requestHeader.getQueueId());
-            }
-            if (phyQueueId == null) {
-                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
-            } else {
-                requestHeader.setQueueId(phyQueueId);
-                return null;
-            }
-        } catch (Throwable t) {
-            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
-        }
-    }
 
-    private RemotingCommand rewriteResponseForStaticTopic(SendMessageRequestHeader requestHeader, SendMessageResponseHeader responseHeader,  TopicQueueMappingContext mappingContext) {
+
+    private RemotingCommand rewriteResponseForStaticTopic(SendMessageResponseHeader responseHeader,  TopicQueueMappingContext mappingContext) {
         try {
-            if (mappingContext == null) {
+            if (mappingContext.getMappingDetail() == null) {
                 return null;
             }
             TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
 
-            long staticLogicOffset = mappingDetail.computeStaticQueueOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset());
+            LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
+            if (mappingItem == null) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
+            }
+            //no need to care the broker name
+            long staticLogicOffset = mappingItem.computeStaticQueueOffset(responseHeader.getQueueOffset());
             if (staticLogicOffset < 0) {
-                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
             }
             responseHeader.setQueueId(mappingContext.getGlobalId());
             responseHeader.setQueueOffset(staticLogicOffset);
@@ -626,7 +589,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             responseHeader.setQueueId(queueIdInt);
             responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
 
-            RemotingCommand rewriteResult =  rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
+            RemotingCommand rewriteResult =  rewriteResponseForStaticTopic(responseHeader, mappingContext);
             if (rewriteResult != null) {
                 return rewriteResult;
             }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 2f5d558..ae2e75d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -17,21 +17,32 @@
 package org.apache.rocketmq.broker.topic;
 
 import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableList;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.LogicQueueMappingItem;
+import org.apache.rocketmq.common.TopicQueueMappingContext;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
 import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
+
 public class TopicQueueMappingManager extends ConfigManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
@@ -94,5 +105,75 @@ public class TopicQueueMappingManager extends ConfigManager {
         return dataVersion;
     }
 
+    public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) {
+        return buildTopicQueueMappingContext(requestHeader, false, Long.MAX_VALUE);
+    }
+
+    //Do not return a null context
+    public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss,  Long globalOffset) {
+        if (requestHeader.getPhysical() != null
+                && Boolean.TRUE.equals(requestHeader.getPhysical())) {
+            return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null);
+        }
+        TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic());
+        if (mappingDetail == null) {
+            //it is not static topic
+            return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null);
+        }
+        //If not find mappingItem, it encounters some errors
+        Integer globalId = requestHeader.getQueueId();
+        if (globalId < 0 && !selectOneWhenMiss) {
+            return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null);
+        }
+
+        if (globalId < 0) {
+            try {
+                if (!mappingDetail.getHostedQueues().isEmpty()) {
+                    //do not check
+                    globalId = mappingDetail.getHostedQueues().keySet().iterator().next();
+                }
+            } catch (Throwable ignored) {
+            }
+        }
+        if (globalId < 0) {
+            return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null);
+        }
+
+        ImmutableList<LogicQueueMappingItem> mappingItemList = null;
+        LogicQueueMappingItem mappingItem = null;
+
+        if (globalOffset == null
+                || Long.MAX_VALUE == globalOffset) {
+            mappingItemList = mappingDetail.getMappingInfo(globalId);
+            if (mappingItemList != null
+                && mappingItemList.size() > 0) {
+                mappingItem = mappingItemList.get(mappingItemList.size() - 1);
+            }
+        } else {
+            mappingItemList = mappingDetail.getMappingInfo(globalId);
+            mappingItem = TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList, globalOffset);
+        }
+        return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, mappingItemList, mappingItem);
+    }
+
+
+    public  RemotingCommand rewriteRequestForStaticTopic(TopicQueueRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
+        try {
+            if (mappingContext.getMappingDetail() == null) {
+                return null;
+            }
+            TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+            LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
+            if (mappingItem == null
+                    || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
+            }
+            requestHeader.setQueueId(mappingItem.getQueueId());
+            return null;
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+        }
+    }
+
 
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
index 50ac43e..ca759a1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
@@ -16,21 +16,31 @@
  */
 package org.apache.rocketmq.common;
 
+import com.google.common.collect.ImmutableList;
+
 public class TopicQueueMappingContext  {
     private String topic;
     private Integer globalId;
     private Long globalOffset;
     private TopicQueueMappingDetail mappingDetail;
+    private ImmutableList<LogicQueueMappingItem> mappingItemList;
     private LogicQueueMappingItem mappingItem;
 
-    public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, LogicQueueMappingItem mappingItem) {
+    public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, ImmutableList<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem mappingItem) {
         this.topic = topic;
         this.globalId = globalId;
         this.globalOffset = globalOffset;
         this.mappingDetail = mappingDetail;
+        this.mappingItemList = mappingItemList;
         this.mappingItem = mappingItem;
     }
 
+    public boolean checkIfAsPhysical() {
+        return mappingDetail == null
+                || mappingItemList == null
+                || (mappingItemList.size() == 1 &&  mappingItemList.get(0).getLogicOffset() == 0);
+    }
+
     public String getTopic() {
         return topic;
     }
@@ -63,6 +73,14 @@ public class TopicQueueMappingContext  {
         this.mappingDetail = mappingDetail;
     }
 
+    public ImmutableList<LogicQueueMappingItem> getMappingItemList() {
+        return mappingItemList;
+    }
+
+    public void setMappingItemList(ImmutableList<LogicQueueMappingItem> mappingItemList) {
+        this.mappingItemList = mappingItemList;
+    }
+
     public LogicQueueMappingItem getMappingItem() {
         return mappingItem;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 8d5c8e8..75f2c52 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
 public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
 
     // the mapping info in current broker, do not register to nameserver
+    // make sure this value is not null
     private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
 
     public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
@@ -77,30 +78,14 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
         return tmpIdMap;
     }
 
-    public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
+    public ImmutableList<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
         return hostedQueues.get(globalId);
     }
 
 
-    public long computeStaticQueueOffset(Integer globalId, long physicalLogicOffset) {
-        List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
-        if (mappingItems == null
-                || mappingItems.isEmpty()) {
-            return -1;
-        }
-        if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) {
-            return mappingItems.get(mappingItems.size() - 1).computeStaticQueueOffset(physicalLogicOffset);
-        }
-        //Consider the "switch" process, reduce the error
-        if (mappingItems.size() >= 2
-            && bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) {
-            return mappingItems.get(mappingItems.size() - 2).computeStaticQueueOffset(physicalLogicOffset);
-        }
-        return -1;
-    }
 
-    public LogicQueueMappingItem findLogicQueueMappingItem(Integer globalId, long logicOffset) {
-        List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+
+    public static LogicQueueMappingItem findLogicQueueMappingItem(ImmutableList<LogicQueueMappingItem> mappingItems, long logicOffset) {
         if (mappingItems == null
                 || mappingItems.isEmpty()) {
             return null;
@@ -143,4 +128,11 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
     public ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> getHostedQueues() {
         return hostedQueues;
     }
+
+    public boolean checkIfAsPhysical(Integer globalId) {
+        List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+        return mappingItems == null
+                || (mappingItems.size() == 1
+                &&  mappingItems.get(0).getLogicOffset() == 0);
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 1bce01f..5407964 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,12 +20,12 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.RequestHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class PullMessageRequestHeader extends RequestHeader {
+public class PullMessageRequestHeader extends TopicQueueRequestHeader {
     @CFNotNull
     private String consumerGroup;
     @CFNotNull
@@ -60,18 +60,22 @@ public class PullMessageRequestHeader extends RequestHeader {
         this.consumerGroup = consumerGroup;
     }
 
+    @Override
     public String getTopic() {
         return topic;
     }
 
+    @Override
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
+    @Override
     public Integer getQueueId() {
         return queueId;
     }
 
+    @Override
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
index 3b7f627..e4e132e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
+public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
     @CFNotNull
     private String consumerGroup;
     @CFNotNull
@@ -44,18 +44,22 @@ public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
         this.consumerGroup = consumerGroup;
     }
 
+    @Override
     public String getTopic() {
         return topic;
     }
 
+    @Override
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
+    @Override
     public Integer getQueueId() {
         return queueId;
     }
 
+    @Override
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index f9dcbff..0ec9795 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -20,12 +20,12 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.RequestHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class SendMessageRequestHeader extends RequestHeader {
+public class SendMessageRequestHeader extends TopicQueueRequestHeader {
     @CFNotNull
     private String producerGroup;
     @CFNotNull
@@ -64,10 +64,12 @@ public class SendMessageRequestHeader extends RequestHeader {
         this.producerGroup = producerGroup;
     }
 
+    @Override
     public String getTopic() {
         return topic;
     }
 
+    @Override
     public void setTopic(String topic) {
         this.topic = topic;
     }
@@ -88,10 +90,12 @@ public class SendMessageRequestHeader extends RequestHeader {
         this.defaultTopicQueueNums = defaultTopicQueueNums;
     }
 
+    @Override
     public Integer getQueueId() {
         return queueId;
     }
 
+    @Override
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
index 3f44db6..e17fe36 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
+public class UpdateConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
     @CFNotNull
     private String consumerGroup;
     @CFNotNull
@@ -46,18 +46,22 @@ public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
         this.consumerGroup = consumerGroup;
     }
 
+    @Override
     public String getTopic() {
         return topic;
     }
 
+    @Override
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
+    @Override
     public Integer getQueueId() {
         return queueId;
     }
 
+    @Override
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java
new file mode 100644
index 0000000..08c3fef
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting;
+
+public abstract class TopicQueueRequestHeader extends   RequestHeader  {
+    public abstract String getTopic();
+    public abstract void setTopic(String topic);
+    public abstract Integer getQueueId();
+    public abstract void setQueueId(Integer queueId);
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 51b6194..34a1790 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -110,6 +110,13 @@ public class RemotingCommand {
         return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
     }
 
+    public static RemotingCommand buildErrorResponse(int code, String remark) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(code);
+        response.setRemark(remark);
+        return response;
+    }
+
     public static RemotingCommand createResponseCommand(int code, String remark,
         Class<? extends CommandCustomHeader> classHeader) {
         RemotingCommand cmd = new RemotingCommand();