You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/06/27 08:58:41 UTC

[rocketmq] branch logical_queue updated: [RIP-21] submodule common & client & remoting

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch logical_queue
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/logical_queue by this push:
     new 7145584  [RIP-21] submodule common & client & remoting
     new 123f967  Merge pull request #3018 from ayanamist/feature-logicalqueue
7145584 is described below

commit 71455840a53922b2ee7303d882ebd6cb27feca61
Author: ayanamist <ay...@gmail.com>
AuthorDate: Wed Jun 23 17:31:25 2021 +0800

    [RIP-21] submodule common & client & remoting
---
 client/pom.xml                                     |  17 ++
 .../client/exception/MQBrokerException.java        |   6 +
 .../client/exception/MQRedirectException.java      |  29 +-
 .../apache/rocketmq/client/impl/MQAdminImpl.java   | 180 ++++++++++--
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 208 ++++++++++++-
 .../impl/consumer/DefaultLitePullConsumerImpl.java |   5 +-
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |   3 +
 .../client/impl/consumer/PullAPIWrapper.java       | 324 +++++++++++++++++++--
 .../impl/consumer/PullResultWithLogicalQueues.java |  97 ++++++
 .../client/impl/factory/MQClientInstance.java      | 105 ++++++-
 .../impl/producer/DefaultMQProducerImpl.java       | 231 ++++++++++++++-
 .../rocketmq/client/latency/MQFaultStrategy.java   |   2 +-
 .../rocketmq/client/producer/SendResult.java       |   9 +
 .../client/producer/SendResultForLogicalQueue.java |  46 +++
 .../hook/ConsumeMessageOpenTracingHookImpl.java    |   2 +-
 .../consumer/DefaultLitePullConsumerTest.java      |   4 +-
 .../DefaultMQPullConsumerLogicalQueueTest.java     | 248 ++++++++++++++++
 .../client/consumer/DefaultMQPushConsumerTest.java |  16 +-
 .../ConsumeMessageConcurrentlyServiceTest.java     |   2 +-
 .../DefaultMQProducerLogicalQueueTest.java         | 311 ++++++++++++++++++++
 .../client/producer/DefaultMQProducerTest.java     |  28 +-
 .../DefaultMQConsumerWithOpenTracingTest.java      |  13 +-
 .../trace/DefaultMQConsumerWithTraceTest.java      |   6 +-
 .../DefaultMQProducerWithOpenTracingTest.java      |  14 +-
 .../trace/DefaultMQProducerWithTraceTest.java      |  26 +-
 .../TransactionMQProducerWithOpenTracingTest.java  |  16 +-
 .../trace/TransactionMQProducerWithTraceTest.java  |  40 ++-
 common/pom.xml                                     |  17 ++
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 +
 .../org/apache/rocketmq/common/ConfigManager.java  |  11 +
 .../java/org/apache/rocketmq/common/MixAll.java    |  13 +
 .../org/apache/rocketmq/common/TopicConfig.java    |  10 +
 .../org/apache/rocketmq/common/TopicQueueId.java   |  54 ++++
 .../rocketmq/common/constant/LoggerName.java       |   1 +
 .../fastjson/GenericMapSuperclassDeserializer.java |  58 ++++
 .../rocketmq/common/message/MessageConst.java      |   2 +
 .../rocketmq/common/message/MessageQueue.java      |   6 +
 .../rocketmq/common/protocol/RequestCode.java      |  12 +
 .../rocketmq/common/protocol/ResponseCode.java     |   2 +-
 .../rocketmq/common/protocol/body/ClusterInfo.java |  14 +
 ...ateMessageQueueForLogicalQueueRequestBody.java} |  38 ++-
 ...zeWrapper.java => MigrateLogicalQueueBody.java} |  30 +-
 .../ReuseTopicLogicalQueueRequestBody.java}        |  43 +--
 .../SealTopicLogicalQueueRequestBody.java}         |  33 +--
 .../protocol/body/TopicConfigSerializeWrapper.java |  11 +
 ...UpdateTopicLogicalQueueMappingRequestBody.java} |  29 +-
 ...a => DeleteTopicLogicalQueueRequestHeader.java} |  11 +-
 .../protocol/header/GetMaxOffsetRequestHeader.java |  18 ++
 ...eader.java => GetTopicConfigRequestHeader.java} |  21 +-
 ...ueryTopicLogicalQueueMappingRequestHeader.java} |  11 +-
 .../header/namesrv/GetRouteInfoRequestHeader.java  |  20 ++
 .../protocol/route/LogicalQueueRouteData.java      | 283 ++++++++++++++++++
 .../common/protocol/route/LogicalQueuesInfo.java   | 165 +++++++++++
 .../protocol/route/LogicalQueuesInfoUnordered.java | 108 +++++++
 .../MessageQueueRouteState.java}                   |  32 +-
 .../common/protocol/route/TopicRouteData.java      |  47 ++-
 .../protocol/route/TopicRouteDataNameSrv.java      |  64 ++++
 .../rocketmq/common/sysflag/MessageSysFlag.java    |   1 +
 .../GenericMapSuperclassDeserializerTest.java      |  57 ++++
 .../common/protocol/route/TopicRouteDataTest.java  |  11 +-
 .../remoting/protocol/RemotingSerializable.java    |  10 +-
 61 files changed, 2928 insertions(+), 313 deletions(-)

diff --git a/client/pom.xml b/client/pom.xml
index 95ef461..53277e0 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -27,6 +27,19 @@
     <artifactId>rocketmq-client</artifactId>
     <name>rocketmq-client ${project.version}</name>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>6</source>
+                    <target>6</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
     <dependencies>
         <dependency>
             <groupId>${project.groupId}</groupId>
@@ -73,5 +86,9 @@
             <artifactId>log4j-slf4j-impl</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
index f07a38b..7870ff1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
@@ -25,6 +25,12 @@ public class MQBrokerException extends Exception {
     private final String errorMessage;
     private final String brokerAddr;
 
+    MQBrokerException() {
+        this.responseCode = 0;
+        this.errorMessage = null;
+        this.brokerAddr = null;
+    }
+
     public MQBrokerException(int responseCode, String errorMessage) {
         super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + "  DESC: "
                 + errorMessage));
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQRedirectException.java
similarity index 55%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
copy to client/src/main/java/org/apache/rocketmq/client/exception/MQRedirectException.java
index a2806e6..0364667 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQRedirectException.java
@@ -14,29 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.client.exception;
 
-/**
- * $Id: GetRouteInfoRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header.namesrv;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+public class MQRedirectException extends MQBrokerException {
+    private static final StackTraceElement[] UNASSIGNED_STACK = new StackTraceElement[0];
 
-public class GetRouteInfoRequestHeader implements CommandCustomHeader {
-    @CFNotNull
-    private String topic;
+    private final byte[] body;
 
-    @Override
-    public void checkFields() throws RemotingCommandException {
+    public MQRedirectException(byte[] responseBody) {
+        this.body = responseBody;
     }
 
-    public String getTopic() {
-        return topic;
+    // This exception class is used as a flow control item, so stack trace is useless and performance killer.
+    @Override public synchronized Throwable fillInStackTrace() {
+        this.setStackTrace(UNASSIGNED_STACK);
+        return this;
     }
 
-    public void setTopic(String topic) {
-        this.topic = topic;
+    public byte[] getBody() {
+        return body;
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 8884e4a..dce830c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -16,40 +16,34 @@
  */
 package org.apache.rocketmq.client.impl;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.MQRedirectException;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -57,6 +51,17 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 public class MQAdminImpl {
 
     private final InternalLogger log = ClientLogger.getLog();
@@ -182,6 +187,10 @@ public class MQAdminImpl {
     }
 
     public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        LogicalQueueRouteData logicalQueueRouteData = searchLogicalQueueRouteByTimestamp(mq, timestamp);
+        if (logicalQueueRouteData != null) {
+            mq = logicalQueueRouteData.getMessageQueue();
+        }
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
@@ -190,8 +199,9 @@ public class MQAdminImpl {
 
         if (brokerAddr != null) {
             try {
-                return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
+                long offset = this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
                     timeoutMillis);
+                return correctLogicalQueueOffset(offset, logicalQueueRouteData);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
             }
@@ -201,24 +211,50 @@ public class MQAdminImpl {
     }
 
     public long maxOffset(MessageQueue mq) throws MQClientException {
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        if (null == brokerAddr) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        }
+        return this.maxOffset(mq, true);
+    }
 
-        if (brokerAddr != null) {
-            try {
-                return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
-            } catch (Exception e) {
-                throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
+    public long maxOffset(MessageQueue mq, boolean committed) throws MQClientException {
+        final MessageQueue origMq = mq;
+        String topic = mq.getTopic();
+        LogicalQueueRouteData previousQueueRouteData = null;
+        for (int i = 0; i < 5; i++) {
+            LogicalQueueRouteData maxQueueRouteData = this.searchLogicalQueueRouteByOffset(origMq, Long.MAX_VALUE);
+            if (maxQueueRouteData != null) {
+                if (previousQueueRouteData != null && Objects.equal(previousQueueRouteData.getMessageQueue(), maxQueueRouteData.getMessageQueue())) {
+                    throw new MQClientException("Topic route info not latest", null);
+                }
+                previousQueueRouteData = maxQueueRouteData;
+                mq = maxQueueRouteData.getMessageQueue();
+            }
+            String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+            if (null == brokerAddr) {
+                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+                brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
             }
-        }
 
-        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+            if (brokerAddr != null) {
+                try {
+                    long offset = this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, topic, mq.getQueueId(), committed, maxQueueRouteData != null, timeoutMillis);
+                    return correctLogicalQueueOffset(offset, maxQueueRouteData);
+                } catch (MQRedirectException e) {
+                    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, false, null, Collections.singleton(mq.getQueueId()));
+                    continue;
+                } catch (Exception e) {
+                    throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
+                }
+            }
+            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+        }
+        throw new MQClientException("Redirect exceed max times", null);
     }
 
     public long minOffset(MessageQueue mq) throws MQClientException {
+        LogicalQueueRouteData minQueueRouteData = searchLogicalQueueRouteByOffset(mq, 0L);
+        if (minQueueRouteData != null) {
+            mq = minQueueRouteData.getMessageQueue();
+        }
+
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
@@ -227,7 +263,8 @@ public class MQAdminImpl {
 
         if (brokerAddr != null) {
             try {
-                return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
+                long offset = this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
+                return correctLogicalQueueOffset(offset, minQueueRouteData);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
             }
@@ -236,7 +273,29 @@ public class MQAdminImpl {
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
     }
 
+    private List<LogicalQueueRouteData> queryLogicalQueueRouteData(MessageQueue mq) {
+        if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName())) {
+            TopicRouteData topicRouteData = this.mQClientFactory.queryTopicRouteData(mq.getTopic());
+            if (topicRouteData == null) {
+                this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+                topicRouteData = this.mQClientFactory.queryTopicRouteData(mq.getTopic());
+            }
+            if (topicRouteData != null) {
+                LogicalQueuesInfo logicalQueuesInfo = topicRouteData.getLogicalQueuesInfo();
+                if (logicalQueuesInfo != null) {
+                    return logicalQueuesInfo.get(mq.getQueueId());
+                }
+            }
+        }
+        return null;
+    }
+
     public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        LogicalQueueRouteData minQueueRouteData = searchLogicalQueueRouteByOffset(mq, 0L);
+        if (minQueueRouteData != null) {
+            mq = minQueueRouteData.getMessageQueue();
+        }
+
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
@@ -445,4 +504,71 @@ public class MQAdminImpl {
 
         throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
     }
+
+    private static long correctLogicalQueueOffset(long offset, LogicalQueueRouteData logicalQueueRouteData) {
+        if (logicalQueueRouteData == null) {
+            return offset;
+        }
+        return logicalQueueRouteData.toLogicalQueueOffset(offset);
+    }
+
+    private LogicalQueueRouteData searchLogicalQueueRouteByTimestamp(MessageQueue mq, long timestamp) {
+        List<LogicalQueueRouteData> queueRouteDataList = this.queryLogicalQueueRouteData(mq);
+        if (queueRouteDataList == null) {
+            return null;
+        }
+        LogicalQueueRouteData logicalQueueRouteData = null;
+        for (LogicalQueueRouteData el : queueRouteDataList) {
+            if (!el.isReadable()) {
+                continue;
+            }
+            if (logicalQueueRouteData == null && el.getFirstMsgTimeMillis() < 0) {
+                logicalQueueRouteData = el;
+            } else if (el.getFirstMsgTimeMillis() >= 0) {
+                if (el.getFirstMsgTimeMillis() <= timestamp && el.getLastMsgTimeMillis() >= timestamp) {
+                    logicalQueueRouteData = el;
+                    break;
+                }
+            }
+        }
+        if (logicalQueueRouteData == null) {
+            logicalQueueRouteData = queueRouteDataList.get(queueRouteDataList.size() - 1);
+        }
+        return logicalQueueRouteData;
+    }
+
+    private LogicalQueueRouteData searchLogicalQueueRouteByOffset(MessageQueue mq, long offset) {
+        List<LogicalQueueRouteData> queueRouteDataList = this.queryLogicalQueueRouteData(mq);
+        if (queueRouteDataList == null) {
+            return null;
+        }
+        {
+            List<LogicalQueueRouteData> list = Lists.newArrayListWithCapacity(queueRouteDataList.size());
+            for (LogicalQueueRouteData queueRouteData : queueRouteDataList) {
+                if (LogicalQueueRouteData.READABLE_PREDICT.apply(queueRouteData)) {
+                    list.add(queueRouteData);
+                }
+            }
+            queueRouteDataList = list;
+        }
+        if (queueRouteDataList.isEmpty()) {
+            return null;
+        }
+        if (offset <= 0) {
+            // min
+            return Collections.min(queueRouteDataList);
+        } else if (offset == Long.MAX_VALUE) {
+            // max
+            return Collections.max(queueRouteDataList);
+        }
+        Collections.sort(queueRouteDataList);
+        LogicalQueueRouteData searchKey = new LogicalQueueRouteData();
+        searchKey.setLogicalQueueDelta(offset);
+        int idx = Collections.binarySearch(queueRouteDataList, searchKey);
+        if (idx < 0) {
+            idx = -idx - 1;
+            idx -= 1;
+        }
+        return queueRouteDataList.get(idx);
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 63b2045..20bf884 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.impl;
 
+import com.google.common.base.Function;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -34,6 +35,7 @@ import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.MQRedirectException;
 import org.apache.rocketmq.client.hook.SendMessageContext;
 import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -71,21 +73,26 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.CreateMessageQueueForLogicalQueueRequestBody;
 import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody;
 import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
+import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
 import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.common.protocol.body.ReuseTopicLogicalQueueRequestBody;
+import org.apache.rocketmq.common.protocol.body.SealTopicLogicalQueueRequestBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.body.UpdateTopicLogicalQueueMappingRequestBody;
 import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -93,6 +100,7 @@ import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeade
 import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
 import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteTopicLogicalQueueRequestHeader;
 import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
@@ -111,6 +119,7 @@ import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
@@ -122,6 +131,7 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHea
 import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
 import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryTopicLogicalQueueMappingRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
@@ -145,8 +155,13 @@ import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerR
 import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -164,6 +179,8 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
+import static com.google.common.base.Optional.fromNullable;
+
 public class MQClientAPIImpl {
 
     private final static InternalLogger log = ClientLogger.getLog();
@@ -556,9 +573,13 @@ public class MQClientAPIImpl {
 
                         producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                     } catch (Exception e) {
-                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
-                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
-                            retryTimesWhenSendFailed, times, e, context, false, producer);
+                        if (e instanceof MQRedirectException) {
+                            sendCallback.onException(e);
+                        } else {
+                            producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+                            onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
+                                retryTimesWhenSendFailed, times, e, context, false, producer);
+                        }
                     }
                 } else {
                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
@@ -644,6 +665,11 @@ public class MQClientAPIImpl {
         final RemotingCommand response,
         final String addr
     ) throws MQBrokerException, RemotingCommandException {
+        HashMap<String, String> extFields = response.getExtFields();
+        if (extFields != null && extFields.containsKey(MessageConst.PROPERTY_REDIRECT)) {
+            throw new MQRedirectException(response.getBody());
+        }
+
         SendStatus sendStatus;
         switch (response.getCode()) {
             case ResponseCode.FLUSH_DISK_TIMEOUT: {
@@ -775,6 +801,11 @@ public class MQClientAPIImpl {
     private PullResult processPullResponse(
         final RemotingCommand response,
         final String addr) throws MQBrokerException, RemotingCommandException {
+        HashMap<String, String> extFields = response.getExtFields();
+        if (extFields != null && extFields.containsKey(MessageConst.PROPERTY_REDIRECT)) {
+            throw new MQRedirectException(response.getBody());
+        }
+
         PullStatus pullStatus = PullStatus.NO_NEW_MSG;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS:
@@ -789,7 +820,6 @@ public class MQClientAPIImpl {
             case ResponseCode.PULL_OFFSET_MOVED:
                 pullStatus = PullStatus.OFFSET_ILLEGAL;
                 break;
-
             default:
                 throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
         }
@@ -854,14 +884,27 @@ public class MQClientAPIImpl {
 
     public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
+        return getMaxOffset(addr, topic, queueId, true, false, timeoutMillis);
+    }
+
+    public long getMaxOffset(final String addr, final String topic, final int queueId, boolean committed,
+        boolean fromLogicalQueue,
+        final long timeoutMillis)
+        throws RemotingException, MQBrokerException, InterruptedException {
         GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
+        requestHeader.setCommitted(committed);
+        requestHeader.setLogicalQueue(fromLogicalQueue);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
             request, timeoutMillis);
         assert response != null;
+        HashMap<String, String> extFields = response.getExtFields();
+        if (extFields != null && extFields.containsKey(MessageConst.PROPERTY_REDIRECT)) {
+            throw new MQRedirectException(response.getBody());
+        }
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GetMaxOffsetResponseHeader responseHeader =
@@ -1357,8 +1400,15 @@ public class MQClientAPIImpl {
 
     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
         boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        return getTopicRouteInfoFromNameServer(topic, timeoutMillis, allowTopicNotExist, null);
+    }
+
+    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
+        boolean allowTopicNotExist, Set<Integer> logicalQueueIdsFilter) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
         requestHeader.setTopic(topic);
+        requestHeader.setSysFlag(MessageSysFlag.LOGICAL_QUEUE_FLAG);
+        requestHeader.setLogicalQueueIdsFilter(logicalQueueIdsFilter);
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
 
@@ -1375,7 +1425,11 @@ public class MQClientAPIImpl {
             case ResponseCode.SUCCESS: {
                 byte[] body = response.getBody();
                 if (body != null) {
-                    return TopicRouteData.decode(body, TopicRouteData.class);
+                    return fromNullable(RemotingSerializable.decode(body, TopicRouteDataNameSrv.class)).transform(new Function<TopicRouteDataNameSrv, TopicRouteData>() {
+                        @Override public TopicRouteData apply(TopicRouteDataNameSrv srv) {
+                            return srv.toTopicRouteData();
+                        }
+                    }).orNull();
                 }
             }
             default:
@@ -2263,4 +2317,148 @@ public class MQClientAPIImpl {
                 return false;
         }
     }
+
+    public LogicalQueuesInfo queryTopicLogicalQueue(String brokerAddr, String topic,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        QueryTopicLogicalQueueMappingRequestHeader requestHeader = new QueryTopicLogicalQueueMappingRequestHeader();
+        requestHeader.setTopic(topic);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_LOGICAL_QUEUE_MAPPING, requestHeader);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
+        assert response != null;
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+        return RemotingSerializable.decode(response.getBody(), LogicalQueuesInfo.class);
+    }
+
+    public void updateTopicLogicalQueue(String brokerAddr, String topic, int queueId, int logicalQueueIndex,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_TOPIC_LOGICAL_QUEUE_MAPPING, null);
+        UpdateTopicLogicalQueueMappingRequestBody requestBody = new UpdateTopicLogicalQueueMappingRequestBody();
+        requestBody.setTopic(topic);
+        requestBody.setQueueId(queueId);
+        requestBody.setLogicalQueueIdx(logicalQueueIndex);
+        request.setBody(requestBody.encode());
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
+        assert response != null;
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+    }
+
+    public void deleteTopicLogicalQueueMapping(String brokerAddr, String topic, long timeoutMillis) throws MQBrokerException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        DeleteTopicLogicalQueueRequestHeader requestHeader = new DeleteTopicLogicalQueueRequestHeader();
+        requestHeader.setTopic(topic);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_LOGICAL_QUEUE_MAPPING, requestHeader);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
+        assert response != null;
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+    }
+
+    public LogicalQueueRouteData sealTopicLogicalQueue(String brokerAddr, LogicalQueueRouteData queueRouteData, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEAL_TOPIC_LOGICAL_QUEUE, null);
+        SealTopicLogicalQueueRequestBody requestBody = new SealTopicLogicalQueueRequestBody();
+        MessageQueue messageQueue = queueRouteData.getMessageQueue();
+        requestBody.setTopic(messageQueue.getTopic());
+        requestBody.setQueueId(messageQueue.getQueueId());
+        requestBody.setLogicalQueueIndex(queueRouteData.getLogicalQueueIndex());
+        request.setBody(requestBody.encode());
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
+        assert response != null;
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+        return RemotingSerializable.decode(response.getBody(), LogicalQueueRouteData.class);
+    }
+
+    public LogicalQueueRouteData reuseTopicLogicalQueue(String brokerAddr, String topic, int queueId,
+        int logicalQueueIdx,
+        MessageQueueRouteState messageQueueRouteState, long timeoutMillis) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REUSE_TOPIC_LOGICAL_QUEUE, null);
+        ReuseTopicLogicalQueueRequestBody requestBody = new ReuseTopicLogicalQueueRequestBody();
+        requestBody.setTopic(topic);
+        requestBody.setQueueId(queueId);
+        requestBody.setLogicalQueueIndex(logicalQueueIdx);
+        requestBody.setMessageQueueRouteState(messageQueueRouteState);
+        request.setBody(requestBody.encode());
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
+        assert response != null;
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+        return RemotingSerializable.decode(response.getBody(), LogicalQueueRouteData.class);
+    }
+
+    public LogicalQueueRouteData createMessageQueueForLogicalQueue(String brokerAddr, String topic, int logicalQueueIdx,
+        MessageQueueRouteState messageQueueStatus,
+        long timeoutMillis) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CREATE_MESSAGE_QUEUE_FOR_LOGICAL_QUEUE, null);
+        CreateMessageQueueForLogicalQueueRequestBody requestBody = new CreateMessageQueueForLogicalQueueRequestBody();
+        requestBody.setTopic(topic);
+        requestBody.setLogicalQueueIndex(logicalQueueIdx);
+        requestBody.setMessageQueueStatus(messageQueueStatus);
+        request.setBody(requestBody.encode());
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
+        assert response != null;
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+        return RemotingSerializable.decode(response.getBody(), LogicalQueueRouteData.class);
+    }
+
+    private MigrateLogicalQueueBody migrateTopicLogicalQueue(int requestCode, String brokerAddr,
+        LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, null);
+        MigrateLogicalQueueBody requestBody = new MigrateLogicalQueueBody();
+        requestBody.setFromQueueRouteData(fromQueueRouteData);
+        requestBody.setToQueueRouteData(toQueueRouteData);
+        request.setBody(requestBody.encode());
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
+        assert response != null;
+        if (response.getCode() != ResponseCode.SUCCESS) {
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+        return response.getBody() != null ? RemotingSerializable.decode(response.getBody(), MigrateLogicalQueueBody.class) : null;
+    }
+
+    public MigrateLogicalQueueBody migrateTopicLogicalQueuePrepare(String brokerAddr,
+        LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return migrateTopicLogicalQueue(RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE, brokerAddr, fromQueueRouteData, toQueueRouteData, timeoutMillis);
+    }
+
+    public MigrateLogicalQueueBody migrateTopicLogicalQueueCommit(String brokerAddr,
+        LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData,
+        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        return migrateTopicLogicalQueue(RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT, brokerAddr, fromQueueRouteData, toQueueRouteData, timeoutMillis);
+    }
+
+    public void migrateTopicLogicalQueueNotify(String brokerAddr,
+        LogicalQueueRouteData fromQueueRouteData,
+        LogicalQueueRouteData toQueueRouteData,
+        long timeoutMillis) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        migrateTopicLogicalQueue(RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY, brokerAddr, fromQueueRouteData, toQueueRouteData, timeoutMillis);
+    }
+
+    public TopicConfig getTopicConfig(final String brokerAddr, String topic,
+        long timeoutMillis) throws InterruptedException,
+        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
+        header.setTopic(topic);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, header);
+        RemotingCommand response = this.remotingClient
+            .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return RemotingSerializable.decode(response.getBody(), TopicConfig.class);
+            }
+            default:
+                break;
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 4e139c4..59fb42c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -147,7 +147,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
     private final MessageQueueLock messageQueueLock = new MessageQueueLock();
 
-    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
+    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
 
     public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
         this.defaultLitePullConsumer = defaultLitePullConsumer;
@@ -892,6 +892,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
             null
         );
         this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
+        if (pullResult instanceof PullResultWithLogicalQueues) {
+            pullResult = ((PullResultWithLogicalQueues) pullResult).getOrigPullResultExt();
+        }
         if (!this.consumeMessageHookList.isEmpty()) {
             ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
             consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index eed5fa4..e54d9b6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -265,6 +265,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
             null
         );
         this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
+        if (pullResult instanceof PullResultWithLogicalQueues) {
+            pullResult = ((PullResultWithLogicalQueues) pullResult).getOrigPullResultExt();
+        }
         //If namespace is not null , reset Topic without namespace.
         this.resetTopic(pullResult.getMsgFoundList());
         if (!this.consumeMessageHookList.isEmpty()) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index cc42a9e..d16002f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -16,18 +16,14 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
+import com.alibaba.fastjson.JSON;
+import com.google.common.base.Objects;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.MQRedirectException;
 import org.apache.rocketmq.client.hook.FilterMessageContext;
 import org.apache.rocketmq.client.hook.FilterMessageHook;
 import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -37,18 +33,34 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Optional.fromNullable;
+
 public class PullAPIWrapper {
     private final InternalLogger log = ClientLogger.getLog();
     private final MQClientInstance mQClientFactory;
@@ -69,7 +81,24 @@ public class PullAPIWrapper {
 
     public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
         final SubscriptionData subscriptionData) {
-        PullResultExt pullResultExt = (PullResultExt) pullResult;
+        final PullResultExt pullResultExt = (PullResultExt) pullResult;
+
+        LogicalQueueRouteData queueRouteData = null;
+        PullResultWithLogicalQueues pullResultWithLogicalQueues = null;
+        if (pullResultExt instanceof PullResultWithLogicalQueues) {
+            pullResultWithLogicalQueues = (PullResultWithLogicalQueues) pullResultExt;
+            queueRouteData = pullResultWithLogicalQueues.getQueueRouteData();
+        }
+
+        if (queueRouteData != null) {
+            pullResultWithLogicalQueues.setOrigPullResultExt(new PullResultExt(pullResultExt.getPullStatus(),
+                queueRouteData.toLogicalQueueOffset(pullResultExt.getNextBeginOffset()),
+                queueRouteData.toLogicalQueueOffset(pullResultExt.getMinOffset()),
+                queueRouteData.toLogicalQueueOffset(pullResultExt.getMaxOffset()),
+                pullResultExt.getMsgFoundList(),
+                pullResultExt.getSuggestWhichBrokerId(),
+                pullResultExt.getMessageBinary()));
+        }
 
         this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
         if (PullStatus.FOUND == pullResult.getPullStatus()) {
@@ -105,6 +134,10 @@ public class PullAPIWrapper {
                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                     Long.toString(pullResult.getMaxOffset()));
                 msg.setBrokerName(mq.getBrokerName());
+                msg.setQueueId(mq.getQueueId());
+                if (queueRouteData != null) {
+                    msg.setQueueOffset(queueRouteData.toLogicalQueueOffset(msg.getQueueOffset()));
+                }
             }
 
             pullResultExt.setMsgFoundList(msgListFilterAgain);
@@ -112,7 +145,7 @@ public class PullAPIWrapper {
 
         pullResultExt.setMessageBinary(null);
 
-        return pullResult;
+        return pullResultExt;
     }
 
     public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
@@ -141,24 +174,67 @@ public class PullAPIWrapper {
     }
 
     public PullResult pullKernelImpl(
-        final MessageQueue mq,
+        MessageQueue mq,
+        final String subExpression,
+        final String expressionType,
+        final long subVersion,
+        long offset,
+        final int maxNums,
+        final int sysFlag,
+        long commitOffset,
+        final long brokerSuspendMaxTimeMillis,
+        final long timeoutMillis,
+        final CommunicationMode communicationMode,
+        PullCallback pullCallback
+    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName())) {
+            LogicalQueueContext logicalQueueContext = new LogicalQueueContext(mq, subExpression, expressionType, subVersion, offset, maxNums, sysFlag, commitOffset, brokerSuspendMaxTimeMillis, timeoutMillis, communicationMode, pullCallback);
+            while (true) {
+                try {
+                    MessageQueue messageQueue = logicalQueueContext.getModifiedMessageQueue();
+                    if (messageQueue == null) {
+                        if (pullCallback != null) {
+                            pullCallback.onSuccess(logicalQueueContext.getPullResult());
+                            return null;
+                        } else {
+                            return logicalQueueContext.getPullResult();
+                        }
+                    }
+                    PullResult pullResult = this.pullKernelImplWithoutRetry(messageQueue, subExpression, expressionType, subVersion, logicalQueueContext.getModifiedOffset(), maxNums, sysFlag, logicalQueueContext.getModifiedCommitOffset(), brokerSuspendMaxTimeMillis, timeoutMillis, communicationMode, logicalQueueContext.wrapPullCallback());
+                    return logicalQueueContext.wrapPullResult(pullResult);
+                } catch (MQRedirectException e) {
+                    if (!logicalQueueContext.shouldRetry(e)) {
+                        throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, "redirect");
+                    }
+                }
+            }
+        } else {
+            return this.pullKernelImplWithoutRetry(mq, subExpression, expressionType, subVersion, offset, maxNums, sysFlag, commitOffset, brokerSuspendMaxTimeMillis, timeoutMillis, communicationMode, pullCallback);
+        }
+    }
+
+    public PullResult pullKernelImplWithoutRetry(
+        MessageQueue mq,
         final String subExpression,
         final String expressionType,
         final long subVersion,
-        final long offset,
+        long offset,
         final int maxNums,
         final int sysFlag,
-        final long commitOffset,
+        long commitOffset,
         final long brokerSuspendMaxTimeMillis,
         final long timeoutMillis,
         final CommunicationMode communicationMode,
-        final PullCallback pullCallback
+        PullCallback pullCallback
     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        String topic = mq.getTopic();
+        int queueId = mq.getQueueId();
+
         FindBrokerResult findBrokerResult =
             this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                 this.recalculatePullFromWhichNode(mq), false);
         if (null == findBrokerResult) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
             findBrokerResult =
                 this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                     this.recalculatePullFromWhichNode(mq), false);
@@ -181,8 +257,8 @@ public class PullAPIWrapper {
 
             PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
             requestHeader.setConsumerGroup(this.consumerGroup);
-            requestHeader.setTopic(mq.getTopic());
-            requestHeader.setQueueId(mq.getQueueId());
+            requestHeader.setTopic(topic);
+            requestHeader.setQueueId(queueId);
             requestHeader.setQueueOffset(offset);
             requestHeader.setMaxMsgNums(maxNums);
             requestHeader.setSysFlag(sysFlagInner);
@@ -194,17 +270,15 @@ public class PullAPIWrapper {
 
             String brokerAddr = findBrokerResult.getBrokerAddr();
             if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
-                brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
+                brokerAddr = computePullFromWhichFilterServer(topic, brokerAddr);
             }
 
-            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
+            return this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                 brokerAddr,
                 requestHeader,
                 timeoutMillis,
                 communicationMode,
                 pullCallback);
-
-            return pullResult;
         }
 
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
@@ -269,4 +343,212 @@ public class PullAPIWrapper {
     public void setDefaultBrokerId(long defaultBrokerId) {
         this.defaultBrokerId = defaultBrokerId;
     }
+
+    private class LogicalQueueContext implements PullCallback {
+        private final MessageQueue mq;
+        private final String subExpression;
+        private final String expressionType;
+        private final long subVersion;
+        private final long offset;
+        private final int maxNums;
+        private final int sysFlag;
+        private final long commitOffset;
+        private final long brokerSuspendMaxTimeMillis;
+        private final long timeoutMillis;
+        private final CommunicationMode communicationMode;
+        private final PullCallback pullCallback;
+
+        private volatile LogicalQueuesInfo logicalQueuesInfo;
+        private volatile LogicalQueueRouteData logicalQueueRouteData;
+
+        private volatile PullResultExt pullResult = null;
+
+        private final AtomicInteger retry = new AtomicInteger();
+
+        public LogicalQueueContext(MessageQueue mq, String subExpression, String expressionType, long subVersion,
+            long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis,
+            long timeoutMillis, CommunicationMode communicationMode,
+            PullCallback pullCallback) {
+            this.mq = mq;
+            this.subExpression = subExpression;
+            this.expressionType = expressionType;
+            this.subVersion = subVersion;
+            this.offset = offset;
+            this.maxNums = maxNums;
+            this.sysFlag = sysFlag;
+            this.commitOffset = commitOffset;
+            this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
+            this.timeoutMillis = timeoutMillis;
+            this.communicationMode = communicationMode;
+            this.pullCallback = pullCallback;
+
+            this.buildLogicalQueuesInfo();
+        }
+
+        private boolean notUsingLogicalQueue() {
+            return !Objects.equal(mq.getBrokerName(), MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME) || this.logicalQueuesInfo == null;
+        }
+
+        private void buildLogicalQueuesInfo() {
+            TopicRouteData topicRouteData = PullAPIWrapper.this.mQClientFactory.queryTopicRouteData(mq.getTopic());
+            if (topicRouteData != null) {
+                this.logicalQueuesInfo = topicRouteData.getLogicalQueuesInfo();
+            }
+        }
+
+        @Override public void onSuccess(PullResult pullResult) {
+            this.pullCallback.onSuccess(this.wrapPullResult(pullResult));
+        }
+
+        @Override public void onException(Throwable t) {
+            if (!this.shouldRetry(t)) {
+                this.pullCallback.onException(t);
+                return;
+            }
+            MessageQueue messageQueue = this.getModifiedMessageQueue();
+            if (messageQueue == null) {
+                this.pullCallback.onSuccess(this.getPullResult());
+                return;
+            }
+            try {
+                PullAPIWrapper.this.pullKernelImplWithoutRetry(messageQueue, subExpression, expressionType, subVersion, this.getModifiedOffset(), maxNums, sysFlag, this.getModifiedCommitOffset(), brokerSuspendMaxTimeMillis, timeoutMillis, communicationMode, this);
+            } catch (Exception e) {
+                this.pullCallback.onException(e);
+            }
+        }
+
+        public MessageQueue getModifiedMessageQueue() {
+            if (this.notUsingLogicalQueue()) {
+                return this.mq;
+            }
+            this.logicalQueuesInfo.readLock().lock();
+            try {
+                List<LogicalQueueRouteData> queueRouteDataList = fromNullable(this.logicalQueuesInfo.get(this.mq.getQueueId())).or(Collections.<LogicalQueueRouteData>emptyList());
+                LogicalQueueRouteData searchKey = new LogicalQueueRouteData();
+                searchKey.setState(MessageQueueRouteState.Normal);
+                searchKey.setLogicalQueueDelta(offset);
+                // it's sorted after getTopicRouteInfoFromNameServer
+                int startIdx = Collections.binarySearch(queueRouteDataList, searchKey);
+                if (startIdx < 0) {
+                    startIdx = -startIdx - 1;
+                    // lower entry
+                    startIdx -= 1;
+                }
+                this.logicalQueueRouteData = null;
+                this.pullResult = null;
+                LogicalQueueRouteData lastReadableLogicalQueueRouteData = null; // first item which delta > offset
+                LogicalQueueRouteData minReadableLogicalQueueRouteData = null;
+                LogicalQueueRouteData maxReadableLogicalQueueRouteData = null;
+                for (int i = 0, size = queueRouteDataList.size(); i < size; i++) {
+                    LogicalQueueRouteData queueRouteData = queueRouteDataList.get(i);
+                    if (!queueRouteData.isReadable()) {
+                        continue;
+                    }
+                    maxReadableLogicalQueueRouteData = queueRouteData;
+                    if (minReadableLogicalQueueRouteData == null) {
+                        minReadableLogicalQueueRouteData = queueRouteData;
+                        if (i < startIdx) {
+                            // must consider following `i++` operation when invoke `continue`, so decrease first
+                            i = startIdx - 1;
+                            continue;
+                        }
+                    }
+                    if (queueRouteData.getLogicalQueueDelta() > offset) {
+                        if (this.logicalQueueRouteData != null) {
+                            break;
+                        } else {
+                            if (lastReadableLogicalQueueRouteData == null) {
+                                lastReadableLogicalQueueRouteData = queueRouteData;
+                            }
+                        }
+                    } else {
+                        this.logicalQueueRouteData = queueRouteData;
+                    }
+                }
+                if (this.logicalQueueRouteData == null) {
+                    if (lastReadableLogicalQueueRouteData != null) {
+                        this.pullResult = new PullResultExt(PullStatus.OFFSET_ILLEGAL, lastReadableLogicalQueueRouteData.getLogicalQueueDelta(), minReadableLogicalQueueRouteData.getLogicalQueueDelta(), maxReadableLogicalQueueRouteData.getLogicalQueueDelta(), null, 0, null);
+                        return null;
+                    } else {
+                        if (maxReadableLogicalQueueRouteData != null) {
+                            this.logicalQueueRouteData = maxReadableLogicalQueueRouteData;
+                        } else {
+                            if (!queueRouteDataList.isEmpty()) {
+                                this.logicalQueueRouteData = queueRouteDataList.get(queueRouteDataList.size() - 1);
+                            } else {
+                                pullResult = new PullResultExt(PullStatus.NO_NEW_MSG, 0, 0, 0, null, 0, null);
+                                return null;
+                            }
+                        }
+                    }
+                }
+                return this.logicalQueueRouteData.getMessageQueue();
+            } finally {
+                this.logicalQueuesInfo.readLock().unlock();
+            }
+        }
+
+        public PullResultExt getPullResult() {
+            return pullResult;
+        }
+
+        public PullCallback wrapPullCallback() {
+            if (this.notUsingLogicalQueue()) {
+                return this.pullCallback;
+            }
+            if (!CommunicationMode.ASYNC.equals(this.communicationMode)) {
+                return this.pullCallback;
+            }
+            return this;
+        }
+
+        public long getModifiedOffset() {
+            return this.logicalQueueRouteData.toMessageQueueOffset(this.offset);
+        }
+
+        public long getModifiedCommitOffset() {
+            // TODO should this be modified too? If offset is not in current broker's range, how do we handle it?
+            return this.commitOffset;
+        }
+
+        public void incrRetry() {
+            this.retry.incrementAndGet();
+        }
+
+        public boolean shouldRetry(Throwable t) {
+            this.incrRetry();
+            if (this.retry.get() >= 3) {
+                return false;
+            }
+            if (t instanceof MQRedirectException) {
+                MQRedirectException e = (MQRedirectException) t;
+                this.processResponseBody(e.getBody());
+                return true;
+            }
+            return false;
+        }
+
+        public PullResult wrapPullResult(PullResult pullResult) {
+            if (pullResult == null) {
+                return null;
+            }
+            // method PullAPIWrapper#processPullResult will modify queueOffset/nextBeginOffset/minOffset/maxOffset
+            return new PullResultWithLogicalQueues(pullResult, this.logicalQueueRouteData);
+        }
+
+        public void processResponseBody(byte[] responseBody) {
+            log.info("LogicalQueueContext.processResponseBody got redirect {}: {}", this.logicalQueueRouteData, responseBody != null ? new String(responseBody, MessageDecoder.CHARSET_UTF8) : null);
+            if (responseBody != null) {
+                try {
+                    List<LogicalQueueRouteData> queueRouteDataList = JSON.parseObject(responseBody, MixAll.TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA);
+                    this.logicalQueuesInfo.updateLogicalQueueRouteDataList(this.mq.getQueueId(), queueRouteDataList);
+                    return;
+                } catch (Exception e) {
+                    log.warn("LogicalQueueContext.processResponseBody {} update exception, fallback to updateTopicRouteInfoFromNameServer", this.logicalQueueRouteData, e);
+                }
+            }
+            PullAPIWrapper.this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic(), false, null, Collections.singleton(this.mq.getQueueId()));
+            this.buildLogicalQueuesInfo();
+        }
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultWithLogicalQueues.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultWithLogicalQueues.java
new file mode 100644
index 0000000..b0fb409
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultWithLogicalQueues.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+
+public class PullResultWithLogicalQueues extends PullResultExt {
+    private PullResultExt origPullResultExt;
+    private final LogicalQueueRouteData queueRouteData;
+
+    public PullResultWithLogicalQueues(PullResult pullResult,
+        LogicalQueueRouteData floorQueueRouteData) {
+        super(pullResult.getPullStatus(), pullResult.getNextBeginOffset(), pullResult.getMinOffset(), pullResult.getMaxOffset(), pullResult.getMsgFoundList(),
+            pullResult instanceof PullResultExt ? ((PullResultExt) pullResult).getSuggestWhichBrokerId() : MixAll.MASTER_ID,
+            pullResult instanceof PullResultExt ? ((PullResultExt) pullResult).getMessageBinary() : null);
+        if (pullResult instanceof PullResultExt) {
+            this.origPullResultExt = (PullResultExt) pullResult;
+        } else {
+            this.origPullResultExt = new PullResultExt(pullResult.getPullStatus(), pullResult.getNextBeginOffset(), pullResult.getMinOffset(), pullResult.getMaxOffset(), pullResult.getMsgFoundList(), MixAll.MASTER_ID, null);
+        }
+        queueRouteData = floorQueueRouteData;
+    }
+
+    public PullResult getOrigPullResultExt() {
+        return origPullResultExt;
+    }
+
+    public LogicalQueueRouteData getQueueRouteData() {
+        return queueRouteData;
+    }
+
+    public void setOrigPullResultExt(PullResultExt pullResultExt) {
+        this.origPullResultExt = pullResultExt;
+    }
+
+    @Override public PullStatus getPullStatus() {
+        return origPullResultExt.getPullStatus();
+    }
+
+    @Override public long getNextBeginOffset() {
+        return origPullResultExt.getNextBeginOffset();
+    }
+
+    @Override public long getMinOffset() {
+        return origPullResultExt.getMinOffset();
+    }
+
+    @Override public long getMaxOffset() {
+        return origPullResultExt.getMaxOffset();
+    }
+
+    @Override public List<MessageExt> getMsgFoundList() {
+        return origPullResultExt.getMsgFoundList();
+    }
+
+    @Override public void setMsgFoundList(List<MessageExt> msgFoundList) {
+        origPullResultExt.setMsgFoundList(msgFoundList);
+    }
+
+    @Override public byte[] getMessageBinary() {
+        return origPullResultExt.getMessageBinary();
+    }
+
+    @Override public void setMessageBinary(byte[] messageBinary) {
+        origPullResultExt.setMessageBinary(messageBinary);
+    }
+
+    @Override public long getSuggestWhichBrokerId() {
+        return origPullResultExt.getSuggestWhichBrokerId();
+    }
+
+    @Override public String toString() {
+        return "PullResultWithLogicalQueues{" +
+            "origPullResultExt=" + origPullResultExt +
+            ", queueRouteData=" + queueRouteData +
+            '}';
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 81e6d84..7f9eb82 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl.factory;
 
 import java.io.UnsupportedEncodingException;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -35,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -64,10 +64,9 @@ import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -76,8 +75,11 @@ import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -172,6 +174,32 @@ public class MQClientInstance {
             }
 
             info.setOrderTopic(true);
+        } else if (route.getOrderTopicConf() == null  && route.getLogicalQueuesInfo() != null) {
+            info.setOrderTopic(false);
+            List<MessageQueue> messageQueueList = info.getMessageQueueList();
+            LogicalQueuesInfo logicalQueueInfo = route.getLogicalQueuesInfo();
+            for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : logicalQueueInfo.entrySet()) {
+                boolean someWritable = false;
+                for (LogicalQueueRouteData logicalQueueRouteData : entry.getValue()) {
+                    if (logicalQueueRouteData.isWritable()) {
+                        someWritable = true;
+                        break;
+                    }
+                }
+                if (!someWritable) {
+                    continue;
+                }
+                MessageQueue mq = new MessageQueue();
+                mq.setQueueId(entry.getKey());
+                mq.setBrokerName(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
+                mq.setTopic(topic);
+                messageQueueList.add(mq);
+            }
+            Collections.sort(messageQueueList, new Comparator<MessageQueue>() {
+                @Override public int compare(MessageQueue o1, MessageQueue o2) {
+                    return MixAll.compareInteger(o1.getQueueId(), o2.getQueueId());
+                }
+            });
         } else {
             List<QueueData> qds = route.getQueueDatas();
             Collections.sort(qds);
@@ -208,6 +236,27 @@ public class MQClientInstance {
 
     public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
         Set<MessageQueue> mqList = new HashSet<MessageQueue>();
+        if (route.getLogicalQueuesInfo() != null) {
+            LogicalQueuesInfo logicalQueueInfo = route.getLogicalQueuesInfo();
+            for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : logicalQueueInfo.entrySet()) {
+                boolean someReadable = false;
+                for (LogicalQueueRouteData logicalQueueRouteData : entry.getValue()) {
+                    if (logicalQueueRouteData.isReadable()) {
+                        someReadable = true;
+                        break;
+                    }
+                }
+                if (!someReadable) {
+                    continue;
+                }
+                MessageQueue mq = new MessageQueue();
+                mq.setQueueId(entry.getKey());
+                mq.setBrokerName(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
+                mq.setTopic(topic);
+                mqList.add(mq);
+            }
+            return mqList;
+        }
         List<QueueData> qds = route.getQueueDatas();
         for (QueueData qd : qds) {
             if (PermName.isReadable(qd.getPerm())) {
@@ -604,6 +653,11 @@ public class MQClientInstance {
 
     public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
         DefaultMQProducer defaultMQProducer) {
+        return this.updateTopicRouteInfoFromNameServer(topic, isDefault, defaultMQProducer, null);
+    }
+
+    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
+        DefaultMQProducer defaultMQProducer, Set<Integer> logicalQueueIdsFilter) {
         try {
             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 try {
@@ -619,7 +673,7 @@ public class MQClientInstance {
                             }
                         }
                     } else {
-                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3, true, logicalQueueIdsFilter);
                     }
                     if (topicRouteData != null) {
                         TopicRouteData old = this.topicRouteTable.get(topic);
@@ -631,7 +685,24 @@ public class MQClientInstance {
                         }
 
                         if (changed) {
-                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
+                            TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
+                            if (logicalQueueIdsFilter != null && cloneTopicRouteData.getLogicalQueuesInfo() != null) {
+                                TopicRouteData curTopicRouteData = this.topicRouteTable.get(topic);
+                                if (curTopicRouteData != null) {
+                                    LogicalQueuesInfo curLogicalQueuesInfo = curTopicRouteData.getLogicalQueuesInfo();
+                                    if (curLogicalQueuesInfo != null) {
+                                        LogicalQueuesInfo cloneLogicalQueuesInfo = cloneTopicRouteData.getLogicalQueuesInfo();
+                                        curLogicalQueuesInfo.readLock().lock();
+                                        try {
+                                            for (Entry<Integer, List<LogicalQueueRouteData>> entry : curLogicalQueuesInfo.entrySet()) {
+                                                cloneLogicalQueuesInfo.putIfAbsent(entry.getKey(), entry.getValue());
+                                            }
+                                        } finally {
+                                            curLogicalQueuesInfo.readLock().unlock();
+                                        }
+                                    }
+                                }
+                            }
 
                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
@@ -789,8 +860,15 @@ public class MQClientInstance {
     private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
         if (olddata == null || nowdata == null)
             return true;
-        TopicRouteData old = olddata.cloneTopicRouteData();
-        TopicRouteData now = nowdata.cloneTopicRouteData();
+        LogicalQueuesInfo oldLogicalQueuesInfo = olddata.getLogicalQueuesInfo();
+        LogicalQueuesInfo newLogicalQueuesInfo = nowdata.getLogicalQueuesInfo();
+        if (oldLogicalQueuesInfo != null && newLogicalQueuesInfo != null) {
+            return oldLogicalQueuesInfo.keySet().equals(newLogicalQueuesInfo.keySet());
+        } else if (oldLogicalQueuesInfo != null || newLogicalQueuesInfo != null) {
+            return true;
+        }
+        TopicRouteData old = new TopicRouteData(olddata);
+        TopicRouteData now = new TopicRouteData(nowdata);
         Collections.sort(old.getQueueDatas());
         Collections.sort(old.getBrokerDatas());
         Collections.sort(now.getQueueDatas());
@@ -812,6 +890,10 @@ public class MQClientInstance {
             }
         }
 
+        if (result) {
+            return true;
+        }
+
         {
             Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
             while (it.hasNext() && !result) {
@@ -1249,4 +1331,13 @@ public class MQClientInstance {
     public ClientConfig getClientConfig() {
         return clientConfig;
     }
+
+    public TopicRouteData queryTopicRouteData(String topic) {
+        TopicRouteData data = this.getAnExistTopicRouteData(topic);
+        if (data == null) {
+            this.updateTopicRouteInfoFromNameServer(topic);
+            data = this.getAnExistTopicRouteData(topic);
+        }
+        return data;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index fac3ed3..8802a9c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,12 +16,16 @@
  */
 package org.apache.rocketmq.client.impl.producer;
 
+import com.alibaba.fastjson.JSON;
+import com.google.common.base.Objects;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
@@ -41,6 +45,7 @@ import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.common.ClientErrorCode;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.MQRedirectException;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
 import org.apache.rocketmq.client.hook.CheckForbiddenContext;
 import org.apache.rocketmq.client.hook.CheckForbiddenHook;
@@ -62,6 +67,7 @@ import org.apache.rocketmq.client.producer.RequestFutureTable;
 import org.apache.rocketmq.client.producer.RequestResponseFuture;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendResultForLogicalQueue;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.client.producer.TransactionCheckListener;
 import org.apache.rocketmq.client.producer.TransactionListener;
@@ -86,6 +92,9 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.utils.CorrelationIdUtil;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -96,6 +105,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 
+;
+
 public class DefaultMQProducerImpl implements MQProducerInner {
     private final InternalLogger log = ClientLogger.getLog();
     private final Random random = new Random();
@@ -502,7 +513,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      *
      * @param msg
      * @param sendCallback
-     * @param timeout the <code>sendCallback</code> will be invoked at most time
+     * @param timeout      the <code>sendCallback</code> will be invoked at most time
      * @throws RejectedExecutionException
      */
     @Deprecated
@@ -718,6 +729,38 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         final SendCallback sendCallback,
         final TopicPublishInfo topicPublishInfo,
         final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName())) {
+            LogicalQueueSendContext logicalQueueContext = new LogicalQueueSendContext(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
+            while (true) {
+                try {
+                    SendResult sendResult = this.sendKernelImplWithoutRetry(msg,
+                        logicalQueueContext.getModifiedMessageQueue(),
+                        communicationMode,
+                        logicalQueueContext.wrapSendCallback(),
+                        topicPublishInfo,
+                        timeout);
+                    return logicalQueueContext.wrapSendResult(sendResult);
+                } catch (MQRedirectException e) {
+                    if (!logicalQueueContext.shouldRetry(e)) {
+                        throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, "redirect");
+                    }
+                } catch (RemotingException e) {
+                    if (!logicalQueueContext.shouldRetry(e)) {
+                        throw e;
+                    }
+                }
+            }
+        } else {
+            return sendKernelImplWithoutRetry(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
+        }
+    }
+
+    private SendResult sendKernelImplWithoutRetry(final Message msg,
+        final MessageQueue mq,
+        final CommunicationMode communicationMode,
+        SendCallback sendCallback,
+        final TopicPublishInfo topicPublishInfo,
+        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         long beginStartTime = System.currentTimeMillis();
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
@@ -754,6 +797,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                     sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                 }
 
+                if (!CommunicationMode.ONEWAY.equals(communicationMode)) {
+                    sysFlag |= MessageSysFlag.LOGICAL_QUEUE_FLAG;
+                }
+
                 if (hasCheckForbiddenHook()) {
                     CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                     checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
@@ -1006,6 +1053,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             executeEndTransactionHook(context);
         }
     }
+
     /**
      * DEFAULT ONEWAY -------------------------------------------------------
      */
@@ -1058,7 +1106,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * @param msg
      * @param mq
      * @param sendCallback
-     * @param timeout the <code>sendCallback</code> will be invoked at most time
+     * @param timeout      the <code>sendCallback</code> will be invoked at most time
      * @throws MQClientException
      * @throws RemotingException
      * @throws InterruptedException
@@ -1188,7 +1236,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * @param selector
      * @param arg
      * @param sendCallback
-     * @param timeout the <code>sendCallback</code> will be invoked at most time
+     * @param timeout      the <code>sendCallback</code> will be invoked at most time
      * @throws MQClientException
      * @throws RemotingException
      * @throws InterruptedException
@@ -1528,7 +1576,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         }
     }
 
-    private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
+    private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture,
+        long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
         Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
         if (responseMessage == null) {
             if (requestResponseFuture.isSendRequestOk()) {
@@ -1644,4 +1693,178 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     public DefaultMQProducer getDefaultMQProducer() {
         return defaultMQProducer;
     }
+
+    private class LogicalQueueSendContext implements SendCallback {
+        private final Message msg;
+        private final MessageQueue mq;
+        private final CommunicationMode communicationMode;
+        private final SendCallback sendCallback;
+        private final TopicPublishInfo topicPublishInfo;
+        private final long timeout;
+
+        private volatile LogicalQueuesInfo logicalQueuesInfo;
+        private volatile LogicalQueueRouteData writableQueueRouteData;
+
+        private final AtomicInteger retry = new AtomicInteger();
+
+        public LogicalQueueSendContext(Message msg, MessageQueue mq,
+            CommunicationMode communicationMode, SendCallback sendCallback,
+            TopicPublishInfo topicPublishInfo, long timeout) {
+            this.msg = msg;
+            this.mq = mq;
+            this.communicationMode = communicationMode;
+            this.sendCallback = sendCallback;
+            this.topicPublishInfo = topicPublishInfo;
+            this.timeout = timeout;
+
+            if (topicPublishInfo == null) {
+                topicPublishInfo = DefaultMQProducerImpl.this.tryToFindTopicPublishInfo(mq.getTopic());
+            }
+            if (topicPublishInfo != null) {
+                this.logicalQueuesInfo = topicPublishInfo.getTopicRouteData().getLogicalQueuesInfo();
+            } else {
+                this.logicalQueuesInfo = null;
+            }
+        }
+
+        private boolean notUsingLogicalQueue() {
+            return !Objects.equal(mq.getBrokerName(), MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME) || this.logicalQueuesInfo == null;
+        }
+
+        public MessageQueue getModifiedMessageQueue() throws MQClientException {
+            if (this.notUsingLogicalQueue()) {
+                return this.mq;
+            }
+            this.writableQueueRouteData = getWritableQueueRouteData();
+            MessageQueue mq = new MessageQueue(this.mq);
+            mq.setBrokerName(writableQueueRouteData.getBrokerName());
+            mq.setQueueId(writableQueueRouteData.getQueueId());
+            return mq;
+        }
+
+        private LogicalQueueRouteData getWritableQueueRouteData() throws MQClientException {
+            this.logicalQueuesInfo.readLock().lock();
+            try {
+                List<LogicalQueueRouteData> queueRouteDataList = logicalQueuesInfo.get(mq.getQueueId());
+                if (queueRouteDataList == null || queueRouteDataList.size() == 0) {
+                    throw new MQClientException(String.format(Locale.ENGLISH, "send to a logical queue %d but no queue route data found", mq.getQueueId()), null);
+                }
+                // usually writable queue is placed in the last position, or second last when queue migrating
+                for (int i = queueRouteDataList.size() - 1; i >= 0; i--) {
+                    LogicalQueueRouteData queueRouteData = queueRouteDataList.get(i);
+                    if (queueRouteData.isWritable()) {
+                        return queueRouteData;
+                    }
+                }
+                throw new MQClientException(String.format(Locale.ENGLISH, "send to a logical queue %d but no writable queue route data found", mq.getQueueId()), null);
+            } finally {
+                this.logicalQueuesInfo.readLock().unlock();
+            }
+        }
+
+        @Override public void onSuccess(SendResult sendResult) {
+            this.sendCallback.onSuccess(this.wrapSendResult(sendResult));
+        }
+
+        @Override public void onException(Throwable t) {
+            if (this.shouldRetry(t)) {
+                try {
+                    DefaultMQProducerImpl.this.sendKernelImplWithoutRetry(msg, this.getModifiedMessageQueue(), communicationMode, this, topicPublishInfo, timeout);
+                    return;
+                } catch (Exception e) {
+                    t = e;
+                }
+            }
+            if (t instanceof MQRedirectException) {
+                t = new MQBrokerException(ResponseCode.SYSTEM_ERROR, "redirect");
+            }
+            this.sendCallback.onException(t);
+        }
+
+        private void handleRedirectException(MQRedirectException re) {
+            byte[] responseBody = re.getBody();
+            log.info("LogicalQueueContext.processResponseBody got redirect {}: {}", this.writableQueueRouteData, responseBody != null ? new String(responseBody, MessageDecoder.CHARSET_UTF8) : null);
+
+            try {
+                List<LogicalQueueRouteData> newQueueRouteDataList = JSON.parseObject(responseBody, MixAll.TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA);
+                this.logicalQueuesInfo.updateLogicalQueueRouteDataList(this.mq.getQueueId(), newQueueRouteDataList);
+            } catch (Exception e) {
+                log.warn("LogicalQueueContext.processResponseBody {} update exception, fallback to updateTopicRouteInfoFromNameServer", this.writableQueueRouteData, e);
+                DefaultMQProducerImpl.this.mQClientFactory.updateTopicRouteInfoFromNameServer(this.mq.getTopic(), false, null, Collections.singleton(mq.getQueueId()));
+                TopicRouteData topicRouteData = DefaultMQProducerImpl.this.mQClientFactory.getAnExistTopicRouteData(mq.getTopic());
+                if (topicRouteData != null) {
+                    this.logicalQueuesInfo = topicRouteData.getLogicalQueuesInfo();
+                } else {
+                    this.logicalQueuesInfo = null;
+                }
+            }
+        }
+
+        public SendCallback wrapSendCallback() {
+            if (this.notUsingLogicalQueue()) {
+                return this.sendCallback;
+            }
+            if (!CommunicationMode.ASYNC.equals(this.communicationMode)) {
+                return this.sendCallback;
+            }
+            return this;
+        }
+
+        public boolean shouldRetry(Throwable t) {
+            this.incrRetry();
+            if (this.exceedMaxRetry()) {
+                log.warn("retry {} too many times: {}", this.retry.get(), this.writableQueueRouteData);
+                return false;
+            }
+            if (!this.writableQueueRouteData.isWritable()) {
+                log.warn("no writable queue: {}", this.writableQueueRouteData);
+                return false;
+            }
+            if (t instanceof MQRedirectException) {
+                this.handleRedirectException((MQRedirectException) t);
+                return true;
+            }
+            return !(t instanceof RemotingException) || this.handleRemotingException((RemotingException) t);
+        }
+
+        public boolean exceedMaxRetry() {
+            return this.retry.get() >= 3;
+        }
+
+        public void incrRetry() {
+            this.retry.incrementAndGet();
+        }
+
+        public SendResult wrapSendResult(SendResult sendResult) {
+            if (sendResult == null) {
+                return null;
+            }
+            SendResultForLogicalQueue newSendResult = new SendResultForLogicalQueue(sendResult, this.writableQueueRouteData.getLogicalQueueIndex());
+            long queueOffset = newSendResult.getQueueOffset();
+            if (queueOffset >= 0) {
+                newSendResult.setQueueOffset(LogicalQueueSendContext.this.writableQueueRouteData.toLogicalQueueOffset(queueOffset));
+            }
+            return newSendResult;
+        }
+
+        public boolean handleRemotingException(RemotingException e) {
+            if (e instanceof RemotingTooMuchRequestException) {
+                return false;
+            }
+            DefaultMQProducerImpl.this.mQClientFactory.updateTopicRouteInfoFromNameServer(this.mq.getTopic(), false, null, Collections.singleton(mq.getQueueId()));
+            this.logicalQueuesInfo = DefaultMQProducerImpl.this.getTopicPublishInfoTable().get(mq.getTopic()).getTopicRouteData().getLogicalQueuesInfo();
+            LogicalQueueRouteData writableQueueRouteData;
+            try {
+                writableQueueRouteData = this.getWritableQueueRouteData();
+            } catch (MQClientException ce) {
+                log.warn("getWritableQueueRouteData exception: {}", this.logicalQueuesInfo.get(mq.getQueueId()), ce);
+                return false;
+            }
+            if (Objects.equal(this.writableQueueRouteData.getMessageQueue(), writableQueueRouteData.getMessageQueue()) && writableQueueRouteData.isWritable()) {
+                // still same MessageQueue and still writable, no need to retry
+                return false;
+            }
+            return true;
+        }
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index ea3d07e..c74b3cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -19,8 +19,8 @@ package org.apache.rocketmq.client.latency;
 
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
 
 public class MQFaultStrategy {
     private final static InternalLogger log = ClientLogger.getLog();
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java
index 8094883..dd7ea1c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java
@@ -28,6 +28,7 @@ public class SendResult {
     private String offsetMsgId;
     private String regionId;
     private boolean traceOn = true;
+    private byte[] rawRespBody;
 
     public SendResult() {
     }
@@ -130,4 +131,12 @@ public class SendResult {
         return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue
             + ", queueOffset=" + queueOffset + "]";
     }
+
+    public void setRawRespBody(byte[] body) {
+        this.rawRespBody = body;
+    }
+
+    public byte[] getRawRespBody() {
+        return rawRespBody;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendResultForLogicalQueue.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendResultForLogicalQueue.java
new file mode 100644
index 0000000..09cd469
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendResultForLogicalQueue.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class SendResultForLogicalQueue extends SendResult {
+    private final String origBrokerName;
+    private final int origQueueId;
+
+    public SendResultForLogicalQueue(SendResult sendResult, int logicalQueueIdx) {
+        super(sendResult.getSendStatus(), sendResult.getMsgId(), sendResult.getOffsetMsgId(), new MessageQueue(sendResult.getMessageQueue().getTopic(), MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx), sendResult.getQueueOffset());
+        this.origBrokerName = sendResult.getMessageQueue().getBrokerName();
+        this.origQueueId = sendResult.getMessageQueue().getQueueId();
+    }
+
+    public String getOrigBrokerName() {
+        return origBrokerName;
+    }
+
+    public int getOrigQueueId() {
+        return origQueueId;
+    }
+
+    @Override public String toString() {
+        return "SendResultForLogicalQueue{" +
+            "origBrokerName='" + origBrokerName + '\'' +
+            ", origQueueId=" + origQueueId +
+            "} " + super.toString();
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
index 28fccae..fe97c77 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
@@ -51,7 +51,7 @@ public class ConsumeMessageOpenTracingHookImpl implements ConsumeMessageHook {
         if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
             return;
         }
-        List<Span> spanList = new ArrayList<>();
+        List<Span> spanList = new ArrayList<Span>();
         for (MessageExt msg : context.getMsgList()) {
             if (msg == null) {
                 continue;
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 04b760e..223d7af 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -95,7 +95,9 @@ public class DefaultLitePullConsumerTest {
     @Before
     public void init() throws Exception {
         ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
-        factoryTable.forEach((s, instance) -> instance.shutdown());
+        for (MQClientInstance instance : factoryTable.values()) {
+            instance.shutdown();
+        }
         factoryTable.clear();
 
         Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java
new file mode 100644
index 0000000..15ec564
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQRedirectException;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.assertj.core.util.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQPullConsumerLogicalQueueTest {
+    private MQClientInstance mQClientFactory;
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+    private DefaultMQPullConsumer pullConsumer;
+    private String topic;
+    private static final String cluster = "DefaultCluster";
+    private static final String broker1Name = "BrokerA";
+    private static final String broker1Addr = "127.0.0.2:10911";
+    private static final String broker2Name = "BrokerB";
+    private static final String broker2Addr = "127.0.0.3:10911";
+
+    @Before
+    public void init() throws Exception {
+        topic = "FooBar" + System.nanoTime();
+
+        mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()));
+
+        FieldUtils.writeField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true);
+
+        pullConsumer = new DefaultMQPullConsumer("FooBarGroup" + System.nanoTime());
+        pullConsumer.setNamesrvAddr("127.0.0.1:9876");
+        pullConsumer.start();
+
+        PullAPIWrapper pullAPIWrapper = pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
+        FieldUtils.writeDeclaredField(pullAPIWrapper, "mQClientFactory", mQClientFactory, true);
+
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRouteData());
+
+        doReturn(new FindBrokerResult(broker1Addr, false)).when(mQClientFactory).findBrokerAddressInSubscribe(eq(broker1Name), anyLong(), anyBoolean());
+        doReturn(new FindBrokerResult(broker2Addr, false)).when(mQClientFactory).findBrokerAddressInSubscribe(eq(broker2Name), anyLong(), anyBoolean());
+    }
+
+    @After
+    public void terminate() {
+        pullConsumer.shutdown();
+    }
+
+    @Test
+    public void testStart_OffsetShouldNotNUllAfterStart() {
+        Assert.assertNotNull(pullConsumer.getOffsetStore());
+    }
+
+    @Test
+    public void testPullMessage_Success() throws Exception {
+        doAnswer(new Answer<PullResultExt>() {
+            @Override public PullResultExt answer(InvocationOnMock mock) throws Throwable {
+                PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                return DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt()));
+            }
+        }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull());
+
+        MessageQueue messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
+        PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
+        assertThat(pullResult).isNotNull();
+        assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
+        assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
+        assertThat(pullResult.getMinOffset()).isEqualTo(123);
+        assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
+        assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
+    }
+
+    @Test
+    public void testPullMessage_NotFound() throws Exception {
+        doAnswer(new Answer<PullResult>() {
+            @Override public PullResult answer(InvocationOnMock mock) throws Throwable {
+                PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                return DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.NO_NEW_MSG, new ArrayList<MessageExt>());
+            }
+        }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull());
+
+        MessageQueue messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
+        PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
+        assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
+    }
+
+    @Test
+    public void testPullMessageAsync_Success() throws Exception {
+        doAnswer(new Answer<PullResult>() {
+            @Override public PullResult answer(InvocationOnMock mock) throws Throwable {
+                PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                PullResult pullResult = DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt()));
+
+                PullCallback pullCallback = mock.getArgument(4);
+                pullCallback.onSuccess(pullResult);
+                return null;
+            }
+        }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.ASYNC), any(PullCallback.class));
+
+        final SettableFuture<PullResult> future = SettableFuture.create();
+        MessageQueue messageQueue = new MessageQueue(topic, broker1Name, 0);
+        pullConsumer.pull(messageQueue, "*", 1024, 3, new PullCallback() {
+            @Override
+            public void onSuccess(PullResult pullResult) {
+                future.set(pullResult);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                future.setException(e);
+            }
+        });
+        PullResult pullResult = future.get(3, TimeUnit.SECONDS);
+        assertThat(pullResult).isNotNull();
+        assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
+        assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
+        assertThat(pullResult.getMinOffset()).isEqualTo(123);
+        assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
+        assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
+    }
+
+    @Test
+    public void testPullMessageSync_Redirect() throws Exception {
+        doAnswer(new Answer<PullResult>() {
+            @Override public PullResult answer(InvocationOnMock mock) throws Throwable {
+                throw new MQRedirectException(JSON.toJSONBytes(ImmutableList.of(
+                    new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Expired, 0, 0, 0, 0, broker1Addr),
+                    new LogicalQueueRouteData(0, 10, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)
+                )));
+            }
+        }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull());
+        doAnswer(new Answer<PullResult>() {
+            @Override public PullResult answer(InvocationOnMock mock) throws Throwable {
+                PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                return DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt()));
+            }
+        }).when(mQClientAPIImpl).pullMessage(eq(broker2Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull());
+
+        MessageQueue messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
+        PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
+        assertThat(pullResult).isNotNull();
+        assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
+        assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
+        assertThat(pullResult.getMinOffset()).isEqualTo(123 + 10);
+        assertThat(pullResult.getMaxOffset()).isEqualTo(2048 + 10);
+        assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
+    }
+
+    private TopicRouteData createTopicRouteData() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        topicRouteData.setBrokerDatas(ImmutableList.of(
+            new BrokerData(cluster, broker1Name, new HashMap<Long, String>(Collections.singletonMap(MixAll.MASTER_ID, broker1Addr))),
+            new BrokerData(cluster, broker2Name, new HashMap<Long, String>(Collections.singletonMap(MixAll.MASTER_ID, broker2Addr)))
+        ));
+
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        QueueData queueData;
+        queueData = new QueueData();
+        queueData.setBrokerName(broker1Name);
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSysFlag(0);
+        queueDataList.add(queueData);
+        queueData = new QueueData();
+        queueData.setBrokerName(broker2Name);
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSysFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+
+        LogicalQueuesInfo info = new LogicalQueuesInfo();
+        info.put(0, Lists.newArrayList(new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Normal, 0, 0, 0, 0, broker1Addr)));
+        topicRouteData.setLogicalQueuesInfo(info);
+        return topicRouteData;
+    }
+
+    private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
+        List<MessageExt> messageExtList) throws Exception {
+        return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, new byte[] {});
+    }
+}
\ No newline at end of file
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 6c84635..033955b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -75,7 +75,6 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -95,7 +94,9 @@ public class DefaultMQPushConsumerTest {
     @Before
     public void init() throws Exception {
         ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
-        factoryTable.forEach((s, instance) -> instance.shutdown());
+        for (MQClientInstance instance : factoryTable.values()) {
+            instance.shutdown();
+        }
         factoryTable.clear();
 
         when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
@@ -170,7 +171,7 @@ public class DefaultMQPushConsumerTest {
     @Test
     public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
+        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<MessageExt>();
         pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
@@ -193,7 +194,7 @@ public class DefaultMQPushConsumerTest {
     @Test
     public void testPullMessage_SuccessWithOrderlyService() throws Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
+        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<MessageExt>();
 
         MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
             @Override
@@ -331,10 +332,13 @@ public class DefaultMQPushConsumerTest {
         final MessageExt[] messageExts = new MessageExt[1];
         pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(
                 new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(),
-                        (msgs, context) -> {
+                    new MessageListenerConcurrently() {
+                        @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                            ConsumeConcurrentlyContext context) {
                             messageExts[0] = msgs.get(0);
                             return null;
-                        }));
+                        }
+                    }));
 
         pushConsumer.getDefaultMQPushConsumerImpl().setConsumeOrderly(true);
         PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index e8feb80..0faf724 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -149,7 +149,7 @@ public class ConsumeMessageConcurrentlyServiceTest {
     @Test
     public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
+        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<MessageExt>();
 
         ConsumeMessageConcurrentlyService  normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
             @Override
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java
new file mode 100644
index 0000000..12d5cba
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.MQRedirectException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.assertj.core.api.ThrowableAssert;
+import org.assertj.core.util.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQProducerLogicalQueueTest {
+    private MQClientInstance mQClientFactory;
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+
+    private DefaultMQProducer producer;
+    private Message message;
+    private String topic;
+
+    private MessageQueue messageQueue;
+
+    private static final String cluster = "DefaultCluster";
+    private static final String broker1Name = "broker1";
+    private static final String broker2Name = "broker2";
+    private static final String broker1Addr = "127.0.0.2:10911";
+    private static final String broker2Addr = "127.0.0.3:10911";
+
+    @Before
+    public void init() throws Exception {
+        topic = "Foobar" + System.nanoTime();
+        messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
+
+        ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String/* clientId */, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
+        for (MQClientInstance instance : factoryTable.values()) {
+            instance.shutdown();
+        }
+        factoryTable.clear();
+
+        mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()));
+        factoryTable.put(new ClientConfig().buildMQClientId(), mQClientFactory);
+
+        String producerGroupTemp = "FooBar_PID" + System.nanoTime();
+        producer = new DefaultMQProducer(producerGroupTemp);
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+        message = new Message(topic, new byte[] {'a'});
+
+        mQClientFactory.registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
+
+        producer.start();
+
+        FieldUtils.writeDeclaredField(producer.getDefaultMQProducerImpl(), "mQClientFactory", mQClientFactory, true);
+        FieldUtils.writeField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true);
+
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenReturn(createSendResult(SendStatus.SEND_OK));
+        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
+            any(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenAnswer(new Answer<SendResult>() {
+                @Override public SendResult answer(InvocationOnMock invocation) throws Throwable {
+                    SendCallback sendCallback = invocation.getArgument(6);
+                    sendCallback.onSuccess(DefaultMQProducerLogicalQueueTest.this.createSendResult(SendStatus.SEND_OK));
+                    return null;
+                }
+            });
+    }
+
+    @After
+    public void terminate() {
+        producer.shutdown();
+    }
+
+    @Test
+    public void testSendMessageSync_Success() throws Exception {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        SendResult sendResult = producer.send(message, messageQueue);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+    }
+
+    @Test
+    public void testSendMessageSync_Redirect() throws Exception {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+
+        when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
+            (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenThrow(new MQRedirectException(null));
+
+        assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
+            @Override public void call() throws Throwable {
+                producer.send(message, messageQueue);
+            }
+        }).isInstanceOf(MQBrokerException.class).hasMessageContaining("redirect");
+
+        when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
+            (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenThrow(new MQRedirectException(JSON.toJSONBytes(ImmutableList.of(
+                new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Expired, 0, 0, 0, 0, broker1Addr),
+                new LogicalQueueRouteData(0, 10, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)))));
+        when(mQClientAPIImpl.sendMessage(eq(broker2Addr), eq(broker2Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
+            (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenReturn(createSendResult(SendStatus.SEND_OK));
+
+        SendResult sendResult = producer.send(message, messageQueue);
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(466L);
+    }
+
+    @Test
+    public void testSendMessageSync_RemotingException() throws Exception {
+        TopicRouteData topicRouteData = createTopicRoute();
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(topicRouteData);
+
+        when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
+            (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenThrow(new RemotingConnectException(broker1Addr));
+        SendResult returnSendResult = createSendResult(SendStatus.SEND_OK);
+        when(mQClientAPIImpl.sendMessage(eq(broker2Addr), eq(broker2Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
+            (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+            .thenReturn(returnSendResult);
+
+        assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
+            @Override public void call() throws Throwable {
+                producer.send(message, messageQueue);
+            }
+        }).isInstanceOf(RemotingConnectException.class).hasMessageContaining(broker1Addr);
+
+        topicRouteData.getLogicalQueuesInfo().get(0).add(new LogicalQueueRouteData(0, -1, new MessageQueue(topic, broker2Name, 1), MessageQueueRouteState.WriteOnly, 0, -1, -1, -1, broker2Addr));
+
+        SendResult sendResult = producer.send(message, messageQueue);
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(-1L);
+    }
+
+    @Test
+    public void testSendMessageAsync_Success() throws Exception {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+
+        final SettableFuture<SendResult> future = SettableFuture.create();
+        producer.send(message, messageQueue, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                future.set(sendResult);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                future.setException(e);
+            }
+        });
+
+        SendResult sendResult = future.get(3, TimeUnit.SECONDS);
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+    }
+
+    @Test
+    public void testSendMessageAsync() throws Exception {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+
+        final AtomicReference<SettableFuture<SendResult>> future = new AtomicReference<SettableFuture<SendResult>>();
+        SendCallback sendCallback = new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                future.get().set(sendResult);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                future.get().setException(e);
+            }
+        };
+
+        Message message = new Message();
+        message.setTopic("test");
+        message.setBody("hello world".getBytes());
+        future.set(SettableFuture.<SendResult>create());
+        producer.send(new Message(), messageQueue, sendCallback);
+        assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
+            @Override public void call() throws Throwable {
+                future.get().get(3, TimeUnit.SECONDS);
+            }
+        }).hasCauseInstanceOf(MQClientException.class).hasMessageContaining("The specified topic is blank");
+
+        //this message is send success
+        message.setTopic(topic);
+        future.set(SettableFuture.<SendResult>create());
+        producer.send(message, messageQueue, sendCallback, 1000);
+        future.get().get(3, TimeUnit.SECONDS);
+    }
+
+    public TopicRouteData createTopicRoute() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        topicRouteData.setBrokerDatas(ImmutableList.of(
+            new BrokerData(cluster, broker1Name, new HashMap<Long, String>(Collections.singletonMap(MixAll.MASTER_ID, broker1Addr))),
+            new BrokerData(cluster, broker2Name, new HashMap<Long, String>(Collections.singletonMap(MixAll.MASTER_ID, broker2Addr)))
+        ));
+
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        QueueData queueData;
+        queueData = new QueueData();
+        queueData.setBrokerName(broker1Name);
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSysFlag(0);
+        queueDataList.add(queueData);
+        queueData = new QueueData();
+        queueData.setBrokerName(broker2Name);
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSysFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+
+        LogicalQueuesInfo info = new LogicalQueuesInfo();
+        info.put(0, Lists.newArrayList(new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Normal, 0, 0, 0, 0, broker1Addr)));
+        topicRouteData.setLogicalQueuesInfo(info);
+        return topicRouteData;
+    }
+
+    private SendResult createSendResult(SendStatus sendStatus) {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("123");
+        sendResult.setOffsetMsgId("123");
+        sendResult.setQueueOffset(456);
+        sendResult.setSendStatus(sendStatus);
+        sendResult.setRegionId("HZ");
+        sendResult.setMessageQueue(new MessageQueue(topic, broker1Name, 0));
+        return sendResult;
+    }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 5f29fe1..a8906b3 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -48,21 +49,20 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -153,7 +153,7 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         SendResult sendResult = producer.send(message);
 
         assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
@@ -163,7 +163,7 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testSendMessageSync_WithBodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         SendResult sendResult = producer.send(bigMessage);
 
         assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
@@ -174,7 +174,7 @@ public class DefaultMQProducerTest {
     @Test
     public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         producer.send(message, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
@@ -197,7 +197,7 @@ public class DefaultMQProducerTest {
         final AtomicInteger cc = new AtomicInteger(0);
         final CountDownLatch countDownLatch = new CountDownLatch(6);
 
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         SendCallback sendCallback = new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
@@ -239,7 +239,7 @@ public class DefaultMQProducerTest {
         final AtomicInteger cc = new AtomicInteger(0);
         final CountDownLatch countDownLatch = new CountDownLatch(4);
 
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         SendCallback sendCallback = new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
@@ -260,7 +260,7 @@ public class DefaultMQProducerTest {
             }
         };
 
-        List<Message> msgs = new ArrayList<>();
+        List<Message> msgs = new ArrayList<Message>();
         for (int i = 0; i < 5; i++) {
             Message message = new Message();
             message.setTopic("test");
@@ -281,7 +281,7 @@ public class DefaultMQProducerTest {
     @Test
     public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         producer.send(bigMessage, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
@@ -300,7 +300,7 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testSendMessageSync_SuccessWithHook() throws Throwable {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         final Throwable[] assertionErrors = new Throwable[1];
         final CountDownLatch countDownLatch = new CountDownLatch(2);
         producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
@@ -368,7 +368,7 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testRequestMessage() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         final AtomicBoolean finish = new AtomicBoolean(false);
         new Thread(new Runnable() {
             @Override public void run() {
@@ -394,13 +394,13 @@ public class DefaultMQProducerTest {
 
     @Test(expected = RequestTimeoutException.class)
     public void testRequestMessage_RequestTimeoutException() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         Message result = producer.request(message, 3 * 1000L);
     }
 
     @Test
     public void testAsyncRequest_OnSuccess() throws Exception {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         RequestCallback requestCallback = new RequestCallback() {
             @Override public void onSuccess(Message message) {
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
index ecf72ae..258b122 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -102,7 +103,9 @@ public class DefaultMQConsumerWithOpenTracingTest {
     @Before
     public void init() throws Exception {
         ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
-        factoryTable.forEach((s, instance) -> instance.shutdown());
+        for (MQClientInstance instance : factoryTable.values()) {
+            instance.shutdown();
+        }
         factoryTable.clear();
 
         when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
@@ -173,7 +176,7 @@ public class DefaultMQConsumerWithOpenTracingTest {
     @Test
     public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
+        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<MessageExt>();
         pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
@@ -193,7 +196,11 @@ public class DefaultMQConsumerWithOpenTracingTest {
         assertThat(msg.getBody()).isEqualTo(new byte[]{'a'});
 
         // wait until consumeMessageAfter hook of tracer is done surely.
-        waitAtMost(1, TimeUnit.SECONDS).until(() -> tracer.finishedSpans().size() == 1);
+        waitAtMost(1, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                return tracer.finishedSpans().size() == 1;
+            }
+        });
         MockSpan span = tracer.finishedSpans().get(0);
         assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
         assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER);
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index aec7d2c..1bfc284 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -115,7 +115,9 @@ public class DefaultMQConsumerWithTraceTest {
     @Before
     public void init() throws Exception {
         ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
-        factoryTable.forEach((s, instance) -> instance.shutdown());
+        for (MQClientInstance instance : factoryTable.values()) {
+            instance.shutdown();
+        }
         factoryTable.clear();
 
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
@@ -216,7 +218,7 @@ public class DefaultMQConsumerWithTraceTest {
         traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
 
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
+        final AtomicReference<MessageExt> messageAtomic = new AtomicReference<MessageExt>();
         pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
index 5d64a93..0a1f685 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
@@ -20,6 +20,11 @@ package org.apache.rocketmq.client.trace;
 import io.opentracing.mock.MockSpan;
 import io.opentracing.mock.MockTracer;
 import io.opentracing.tag.Tags;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -47,17 +52,14 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -113,7 +115,7 @@ public class DefaultMQProducerWithOpenTracingTest {
     @Test
     public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
         producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         producer.send(message);
         assertThat(tracer.finishedSpans().size()).isEqualTo(1);
         MockSpan span = tracer.finishedSpans().get(0);
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index 62b3417..c371694 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -17,6 +17,13 @@
 
 package org.apache.rocketmq.client.trace;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -42,18 +49,17 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -121,7 +127,7 @@ public class DefaultMQProducerWithTraceTest {
     @Test
     public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
         traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         try {
             producer.send(message);
@@ -133,7 +139,7 @@ public class DefaultMQProducerWithTraceTest {
 
     @Test
     public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         try {
             producer.send(message);
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
index dd6d108..aca6254 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
@@ -20,6 +20,12 @@ package org.apache.rocketmq.client.trace;
 import io.opentracing.mock.MockSpan;
 import io.opentracing.mock.MockTracer;
 import io.opentracing.tag.Tags;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -53,18 +59,14 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -131,7 +133,7 @@ public class TransactionMQProducerWithOpenTracingTest {
     @Test
     public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
         producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
         producer.sendMessageInTransaction(message, null);
 
         assertThat(tracer.finishedSpans().size()).isEqualTo(2);
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
index f838817..b3a4414 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
@@ -17,6 +17,13 @@
 
 package org.apache.rocketmq.client.trace;
 
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -50,19 +57,20 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
-
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
+import org.mockito.stubbing.Answer;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.when;
 
@@ -127,7 +135,7 @@ public class TransactionMQProducerWithTraceTest {
 
         Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList");
         fieldHooks.setAccessible(true);
-        List<EndTransactionHook>hooks = new ArrayList<>();
+        List<EndTransactionHook>hooks = new ArrayList<EndTransactionHook>();
         hooks.add(endTransactionHook);
         fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks);
 
@@ -142,12 +150,14 @@ public class TransactionMQProducerWithTraceTest {
     @Test
     public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
         traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
-        AtomicReference<EndTransactionContext> context = new AtomicReference<>();
-        doAnswer(mock -> {
-            context.set(mock.getArgument(0));
-            return null;
-        }).when(endTransactionHook).endTransaction(any());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        final AtomicReference<EndTransactionContext> context = new AtomicReference<EndTransactionContext>();
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws Throwable {
+                context.set(mock.<EndTransactionContext>getArgument(0));
+                return null;
+            }
+        }).when(endTransactionHook).endTransaction(ArgumentMatchers.<EndTransactionContext>any());
         producer.sendMessageInTransaction(message, null);
 
         EndTransactionContext ctx = context.get();
diff --git a/common/pom.xml b/common/pom.xml
index ac1d086..ff32ddf 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -27,6 +27,19 @@
     <artifactId>rocketmq-common</artifactId>
     <name>rocketmq-common ${project.version}</name>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>6</source>
+                    <target>6</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
     <dependencies>
         <dependency>
             <groupId>${project.groupId}</groupId>
@@ -40,5 +53,9 @@
             <groupId>commons-validator</groupId>
             <artifactId>commons-validator</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index d80b3d2..e9e8ce0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -187,6 +187,8 @@ public class BrokerConfig {
 
     private boolean autoDeleteUnusedStats = false;
 
+    private long forwardTimeout = 3 * 1000;
+
     public static String localHostName() {
         try {
             return InetAddress.getLocalHost().getHostName();
@@ -804,4 +806,12 @@ public class BrokerConfig {
     public void setAutoDeleteUnusedStats(boolean autoDeleteUnusedStats) {
         this.autoDeleteUnusedStats = autoDeleteUnusedStats;
     }
+
+    public long getForwardTimeout() {
+        return forwardTimeout;
+    }
+
+    public void setForwardTimeout(long timeout) {
+        this.forwardTimeout = timeout;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
index 99b5f0c..13d5b6b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.common;
 
 import java.io.IOException;
+import java.util.Map;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -67,6 +68,16 @@ public abstract class ConfigManager {
 
     public abstract void decode(final String jsonString);
 
+    public synchronized <T> void persist(String topicName, T t) {
+        // stub for future
+        this.persist();
+    }
+
+    public synchronized <T> void persist(Map<String, T> m) {
+        // stub for future
+        this.persist();
+    }
+
     public synchronized void persist() {
         String jsonString = this.encode(true);
         if (jsonString != null) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 9d95ecb..8aae04b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.common;
 
+import com.alibaba.fastjson.TypeReference;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -26,6 +27,7 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
 import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
@@ -41,6 +43,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
@@ -83,6 +86,9 @@ public class MixAll {
     public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
     public static final String REPLY_MESSAGE_FLAG = "reply";
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logical_queue_broker__";
+    public static final Type TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA = new TypeReference<List<LogicalQueueRouteData>>() {
+        }.getType();
 
     public static String getWSAddr() {
         String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
@@ -443,4 +449,11 @@ public class MixAll {
         return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
     }
 
+    public static int compareInteger(int x, int y) {
+        return (x < y) ? -1 : ((x == y) ? 0 : 1);
+    }
+
+    public static int compareLong(long x, long y) {
+        return (x < y) ? -1 : ((x == y) ? 0 : 1);
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
index 4795cce..c082ba6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
@@ -37,6 +37,16 @@ public class TopicConfig {
         this.topicName = topicName;
     }
 
+    public TopicConfig(TopicConfig other) {
+        this.topicName = other.topicName;
+        this.readQueueNums = other.readQueueNums;
+        this.writeQueueNums = other.writeQueueNums;
+        this.perm = other.perm;
+        this.topicFilterType = other.topicFilterType;
+        this.topicSysFlag = other.topicSysFlag;
+        this.order = other.order;
+    }
+
     public TopicConfig(String topicName, int readQueueNums, int writeQueueNums, int perm) {
         this.topicName = topicName;
         this.readQueueNums = readQueueNums;
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueId.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueId.java
new file mode 100644
index 0000000..ee215af
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueId.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import com.google.common.base.Objects;
+
+public class TopicQueueId {
+    private final String topic;
+    private final int queueId;
+
+    private final int hash;
+
+    public TopicQueueId(String topic, int queueId) {
+        this.topic = topic;
+        this.queueId = queueId;
+
+        this.hash = Objects.hashCode(topic, queueId);
+    }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        TopicQueueId broker = (TopicQueueId) o;
+        return queueId == broker.queueId && Objects.equal(topic, broker.topic);
+    }
+
+    @Override public int hashCode() {
+        return hash;
+    }
+
+    @Override public String toString() {
+        final StringBuilder sb = new StringBuilder("MessageQueueInBroker{");
+        sb.append("topic='").append(topic).append('\'');
+        sb.append(", queueId=").append(queueId);
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index fe0ae9f..9c8ff39 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -37,4 +37,5 @@ public class LoggerName {
     public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
     public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
     public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
+    public static final String STDOUT_LOGGER_NAME = "STDOUT";
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializer.java b/common/src/main/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializer.java
new file mode 100644
index 0000000..e4388eb
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.fastjson;
+
+import com.alibaba.fastjson.JSONException;
+import com.alibaba.fastjson.parser.DefaultJSONParser;
+import com.alibaba.fastjson.parser.JSONToken;
+import com.alibaba.fastjson.parser.deserializer.MapDeserializer;
+import com.alibaba.fastjson.parser.deserializer.ObjectDeserializer;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ * workaround https://github.com/alibaba/fastjson/issues/3730
+ */
+public class GenericMapSuperclassDeserializer implements ObjectDeserializer {
+    public static final GenericMapSuperclassDeserializer INSTANCE = new GenericMapSuperclassDeserializer();
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    @Override public <T> T deserialze(DefaultJSONParser parser, Type type, Object fieldName) {
+        Class<?> clz = (Class<?>) type;
+        Type genericSuperclass = clz.getGenericSuperclass();
+        Map map;
+        try {
+            map = (Map) clz.newInstance();
+        } catch (Exception e) {
+            throw new JSONException("unsupport type " + type, e);
+        }
+        ParameterizedType parameterizedType = (ParameterizedType) genericSuperclass;
+        Type keyType = parameterizedType.getActualTypeArguments()[0];
+        Type valueType = parameterizedType.getActualTypeArguments()[1];
+        if (String.class == keyType) {
+            return (T) MapDeserializer.parseMap(parser, (Map<String, Object>) map, valueType, fieldName);
+        } else {
+            return (T) MapDeserializer.parseMap(parser, map, keyType, valueType, fieldName);
+        }
+    }
+
+    @Override public int getFastMatchToken() {
+        return JSONToken.LBRACE;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 5bdc846..115bff7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -52,6 +52,8 @@ public class MessageConst {
     public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
     public static final String PROPERTY_CLUSTER = "CLUSTER";
     public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
+    public static final String PROPERTY_FORWARD_QUEUE_ID = "PROPERTY_FORWARD_QUEUE_ID";
+    public static final String PROPERTY_REDIRECT = "REDIRECT";
 
     public static final String KEY_SEPARATOR = " ";
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java
index 03ba202..7926b73 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java
@@ -28,6 +28,12 @@ public class MessageQueue implements Comparable<MessageQueue>, Serializable {
 
     }
 
+    public MessageQueue(MessageQueue other) {
+        this.topic = other.topic;
+        this.brokerName = other.brokerName;
+        this.queueId = other.queueId;
+    }
+
     public MessageQueue(String topic, String brokerName, int queueId) {
         this.topic = topic;
         this.brokerName = brokerName;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 75ceff3..514aa8d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -188,4 +188,16 @@ public class RequestCode {
     public static final int SEND_REPLY_MESSAGE_V2 = 325;
 
     public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
+
+    public static final int GET_TOPIC_CONFIG = 351;
+
+    public static final int UPDATE_TOPIC_LOGICAL_QUEUE_MAPPING = 411;
+    public static final int DELETE_TOPIC_LOGICAL_QUEUE_MAPPING = 422;
+    public static final int QUERY_TOPIC_LOGICAL_QUEUE_MAPPING = 413;
+    public static final int SEAL_TOPIC_LOGICAL_QUEUE = 414;
+    public static final int REUSE_TOPIC_LOGICAL_QUEUE = 415;
+    public static final int CREATE_MESSAGE_QUEUE_FOR_LOGICAL_QUEUE = 416;
+    public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE = 417;
+    public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT = 418;
+    public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY = 419;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index dc74444..37d7feb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.common.protocol;
 import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
 
 public class ResponseCode extends RemotingSysResponseCode {
+    public static final int ASYNC_AND_RETURN_NULL = -2;
 
     public static final int FLUSH_DISK_TIMEOUT = 10;
 
@@ -79,5 +80,4 @@ public class ResponseCode extends RemotingSysResponseCode {
     public static final int DELETE_ACL_CONFIG_FAILED = 210;
 
     public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
-
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
index 76c64a8..222e51a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
+import com.google.common.base.Objects;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -62,4 +63,17 @@ public class ClusterInfo extends RemotingSerializable {
     public String[] retrieveAllClusterNames() {
         return clusterAddrTable.keySet().toArray(new String[] {});
     }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        ClusterInfo info = (ClusterInfo) o;
+        return Objects.equal(brokerAddrTable, info.brokerAddrTable) && Objects.equal(clusterAddrTable, info.clusterAddrTable);
+    }
+
+    @Override public int hashCode() {
+        return Objects.hashCode(brokerAddrTable, clusterAddrTable);
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CreateMessageQueueForLogicalQueueRequestBody.java
similarity index 55%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/body/CreateMessageQueueForLogicalQueueRequestBody.java
index 871309d..e446d9b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CreateMessageQueueForLogicalQueueRequestBody.java
@@ -14,25 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.common.protocol.body;
 
-/**
- * $Id: GetMaxOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
-    @CFNotNull
+public class CreateMessageQueueForLogicalQueueRequestBody extends RemotingSerializable {
     private String topic;
-    @CFNotNull
-    private Integer queueId;
-
-    @Override
-    public void checkFields() throws RemotingCommandException {
-    }
+    private int logicalQueueIndex;
+    private MessageQueueRouteState messageQueueStatus;
 
     public String getTopic() {
         return topic;
@@ -42,11 +32,19 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
         this.topic = topic;
     }
 
-    public Integer getQueueId() {
-        return queueId;
+    public int getLogicalQueueIndex() {
+        return logicalQueueIndex;
+    }
+
+    public void setLogicalQueueIndex(int logicalQueueIndex) {
+        this.logicalQueueIndex = logicalQueueIndex;
+    }
+
+    public MessageQueueRouteState getMessageQueueStatus() {
+        return messageQueueStatus;
     }
 
-    public void setQueueId(Integer queueId) {
-        this.queueId = queueId;
+    public void setMessageQueueStatus(MessageQueueRouteState messageQueueStatuses) {
+        this.messageQueueStatus = messageQueueStatuses;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/MigrateLogicalQueueBody.java
similarity index 52%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/body/MigrateLogicalQueueBody.java
index ce12302..6eb06a5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/MigrateLogicalQueueBody.java
@@ -14,33 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.rocketmq.common.protocol.body;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-public class TopicConfigSerializeWrapper extends RemotingSerializable {
-    private ConcurrentMap<String, TopicConfig> topicConfigTable =
-        new ConcurrentHashMap<String, TopicConfig>();
-    private DataVersion dataVersion = new DataVersion();
+public class MigrateLogicalQueueBody extends RemotingSerializable {
+    private LogicalQueueRouteData fromQueueRouteData;
+    private LogicalQueueRouteData toQueueRouteData;
 
-    public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
-        return topicConfigTable;
+    public LogicalQueueRouteData getFromQueueRouteData() {
+        return fromQueueRouteData;
     }
 
-    public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
-        this.topicConfigTable = topicConfigTable;
+    public void setFromQueueRouteData(
+        LogicalQueueRouteData fromQueueRouteData) {
+        this.fromQueueRouteData = fromQueueRouteData;
     }
 
-    public DataVersion getDataVersion() {
-        return dataVersion;
+    public LogicalQueueRouteData getToQueueRouteData() {
+        return toQueueRouteData;
     }
 
-    public void setDataVersion(DataVersion dataVersion) {
-        this.dataVersion = dataVersion;
+    public void setToQueueRouteData(LogicalQueueRouteData toQueueRouteData) {
+        this.toQueueRouteData = toQueueRouteData;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ReuseTopicLogicalQueueRequestBody.java
similarity index 53%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/body/ReuseTopicLogicalQueueRequestBody.java
index 871309d..22ab452 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ReuseTopicLogicalQueueRequestBody.java
@@ -14,25 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.common.protocol.body;
 
-/**
- * $Id: GetMaxOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
-    @CFNotNull
+public class ReuseTopicLogicalQueueRequestBody extends RemotingSerializable {
     private String topic;
-    @CFNotNull
-    private Integer queueId;
-
-    @Override
-    public void checkFields() throws RemotingCommandException {
-    }
+    private int queueId;
+    private int logicalQueueIndex;
+    private MessageQueueRouteState messageQueueRouteState;
 
     public String getTopic() {
         return topic;
@@ -42,11 +33,27 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
         this.topic = topic;
     }
 
-    public Integer getQueueId() {
+    public int getQueueId() {
         return queueId;
     }
 
-    public void setQueueId(Integer queueId) {
+    public void setQueueId(int queueId) {
         this.queueId = queueId;
     }
+
+    public int getLogicalQueueIndex() {
+        return logicalQueueIndex;
+    }
+
+    public void setLogicalQueueIndex(int logicalQueueIndex) {
+        this.logicalQueueIndex = logicalQueueIndex;
+    }
+
+    public void setMessageQueueRouteState(MessageQueueRouteState messageQueueRouteState) {
+        this.messageQueueRouteState = messageQueueRouteState;
+    }
+
+    public MessageQueueRouteState getMessageQueueRouteState() {
+        return messageQueueRouteState;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SealTopicLogicalQueueRequestBody.java
similarity index 62%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/body/SealTopicLogicalQueueRequestBody.java
index 871309d..edb521f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SealTopicLogicalQueueRequestBody.java
@@ -14,25 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.common.protocol.body;
 
-/**
- * $Id: GetMaxOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
-    @CFNotNull
+public class SealTopicLogicalQueueRequestBody extends RemotingSerializable {
     private String topic;
-    @CFNotNull
-    private Integer queueId;
-
-    @Override
-    public void checkFields() throws RemotingCommandException {
-    }
+    private int queueId;
+    private int logicalQueueIndex;
 
     public String getTopic() {
         return topic;
@@ -42,11 +31,19 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
         this.topic = topic;
     }
 
-    public Integer getQueueId() {
+    public int getQueueId() {
         return queueId;
     }
 
-    public void setQueueId(Integer queueId) {
+    public void setQueueId(int queueId) {
         this.queueId = queueId;
     }
+
+    public int getLogicalQueueIndex() {
+        return logicalQueueIndex;
+    }
+
+    public void setLogicalQueueIndex(int logicalQueueIndex) {
+        this.logicalQueueIndex = logicalQueueIndex;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
index ce12302..1389663 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -17,15 +17,18 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class TopicConfigSerializeWrapper extends RemotingSerializable {
     private ConcurrentMap<String, TopicConfig> topicConfigTable =
         new ConcurrentHashMap<String, TopicConfig>();
+    private Map<String/* topic */, LogicalQueuesInfo> logicalQueuesInfoMap;
     private DataVersion dataVersion = new DataVersion();
 
     public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
@@ -43,4 +46,12 @@ public class TopicConfigSerializeWrapper extends RemotingSerializable {
     public void setDataVersion(DataVersion dataVersion) {
         this.dataVersion = dataVersion;
     }
+
+    public Map<String, LogicalQueuesInfo> getLogicalQueuesInfoMap() {
+        return logicalQueuesInfoMap;
+    }
+
+    public void setLogicalQueuesInfoMap(Map<String, LogicalQueuesInfo> logicalQueuesInfoMap) {
+        this.logicalQueuesInfoMap = logicalQueuesInfoMap;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/UpdateTopicLogicalQueueMappingRequestBody.java
similarity index 62%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/body/UpdateTopicLogicalQueueMappingRequestBody.java
index 871309d..67c6fd2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/UpdateTopicLogicalQueueMappingRequestBody.java
@@ -14,24 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.common.protocol.body;
 
-/**
- * $Id: GetMaxOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
-    @CFNotNull
+public class UpdateTopicLogicalQueueMappingRequestBody extends RemotingSerializable {
     private String topic;
-    @CFNotNull
-    private Integer queueId;
+    private int queueId;
+    private int logicalQueueIdx;
+
+    public int getLogicalQueueIdx() {
+        return logicalQueueIdx;
+    }
 
-    @Override
-    public void checkFields() throws RemotingCommandException {
+    public void setLogicalQueueIdx(int logicalQueueIdx) {
+        this.logicalQueueIdx = logicalQueueIdx;
     }
 
     public String getTopic() {
@@ -42,11 +39,11 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
         this.topic = topic;
     }
 
-    public Integer getQueueId() {
+    public int getQueueId() {
         return queueId;
     }
 
-    public void setQueueId(Integer queueId) {
+    public void setQueueId(int queueId) {
         this.queueId = queueId;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteTopicLogicalQueueRequestHeader.java
similarity index 79%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteTopicLogicalQueueRequestHeader.java
index a2806e6..fa8d50d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteTopicLogicalQueueRequestHeader.java
@@ -14,22 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-/**
- * $Id: GetRouteInfoRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header.namesrv;
+package org.apache.rocketmq.common.protocol.header;
 
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetRouteInfoRequestHeader implements CommandCustomHeader {
+public class DeleteTopicLogicalQueueRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String topic;
 
-    @Override
-    public void checkFields() throws RemotingCommandException {
+    @Override public void checkFields() throws RemotingCommandException {
     }
 
     public String getTopic() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index 871309d..6963195 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -29,6 +29,8 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
     private String topic;
     @CFNotNull
     private Integer queueId;
+    private boolean committed;
+    private boolean logicalQueue;
 
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -49,4 +51,20 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
+
+    public void setCommitted(boolean committed) {
+        this.committed = committed;
+    }
+
+    public boolean isCommitted() {
+        return committed;
+    }
+
+    public void setLogicalQueue(boolean logicalQueue) {
+        this.logicalQueue = logicalQueue;
+    }
+
+    public boolean getLogicalQueue() {
+        return logicalQueue;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
similarity index 84%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
index a2806e6..ea9d17c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
@@ -15,28 +15,31 @@
  * limitations under the License.
  */
 
-/**
- * $Id: GetRouteInfoRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header.namesrv;
+package org.apache.rocketmq.common.protocol.header;
 
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetRouteInfoRequestHeader implements CommandCustomHeader {
-    @CFNotNull
-    private String topic;
-
+public class GetTopicConfigRequestHeader implements CommandCustomHeader {
     @Override
     public void checkFields() throws RemotingCommandException {
     }
 
+    @CFNotNull
+    private String topic;
+
+    /**
+     * @return the topic
+     */
     public String getTopic() {
         return topic;
     }
 
+    /**
+     * @param topic the topic to set
+     */
     public void setTopic(String topic) {
         this.topic = topic;
     }
-}
+}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryTopicLogicalQueueMappingRequestHeader.java
similarity index 79%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryTopicLogicalQueueMappingRequestHeader.java
index a2806e6..b7e0c46 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryTopicLogicalQueueMappingRequestHeader.java
@@ -14,22 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-/**
- * $Id: GetRouteInfoRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header.namesrv;
+package org.apache.rocketmq.common.protocol.header;
 
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetRouteInfoRequestHeader implements CommandCustomHeader {
+public class QueryTopicLogicalQueueMappingRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String topic;
 
-    @Override
-    public void checkFields() throws RemotingCommandException {
+    @Override public void checkFields() throws RemotingCommandException {
     }
 
     public String getTopic() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
index a2806e6..ad776c8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
@@ -20,6 +20,7 @@
  */
 package org.apache.rocketmq.common.protocol.header.namesrv;
 
+import java.util.Set;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -28,6 +29,9 @@ public class GetRouteInfoRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String topic;
 
+    private int sysFlag;
+    private Set<Integer> logicalQueueIdsFilter;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -39,4 +43,20 @@ public class GetRouteInfoRequestHeader implements CommandCustomHeader {
     public void setTopic(String topic) {
         this.topic = topic;
     }
+
+    public int getSysFlag() {
+        return sysFlag;
+    }
+
+    public void setSysFlag(int sysFlag) {
+        this.sysFlag = sysFlag;
+    }
+
+    public void setLogicalQueueIdsFilter(Set<Integer> filter) {
+        this.logicalQueueIdsFilter = filter;
+    }
+
+    public Set<Integer> getLogicalQueueIdsFilter() {
+        return logicalQueueIdsFilter;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicalQueueRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicalQueueRouteData.java
new file mode 100644
index 0000000..1615758
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicalQueueRouteData.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.route;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+/**
+ * logical queue offset -> message queue offset mapping
+ */
+public class LogicalQueueRouteData implements Comparable<LogicalQueueRouteData> {
+    private volatile int logicalQueueIndex = -1; /* -1 means not set */
+    private volatile long logicalQueueDelta = -1; /* inclusive, -1 means not set, occurred in writeOnly state */
+
+    private MessageQueue messageQueue;
+
+    private volatile MessageQueueRouteState state = MessageQueueRouteState.Normal;
+
+    private volatile long offsetDelta = 0; // valid when Normal/WriteOnly/ReadOnly
+    private volatile long offsetMax = -1; // exclusive, valid when ReadOnly
+
+    private volatile long firstMsgTimeMillis = -1; // valid when ReadOnly
+    private volatile long lastMsgTimeMillis = -1; // valid when ReadOnly
+
+    private String brokerAddr; /* not always set, only used by high availability forward */
+
+    public LogicalQueueRouteData() {
+    }
+
+    public LogicalQueueRouteData(int logicalQueueIndex, long logicalQueueDelta,
+        MessageQueue messageQueue, MessageQueueRouteState state, long offsetDelta, long offsetMax,
+        long firstMsgTimeMillis,
+        long lastMsgTimeMillis, String brokerAddr) {
+        this.logicalQueueIndex = logicalQueueIndex;
+        this.logicalQueueDelta = logicalQueueDelta;
+        this.messageQueue = messageQueue;
+        this.state = state;
+        this.offsetDelta = offsetDelta;
+        this.offsetMax = offsetMax;
+        this.firstMsgTimeMillis = firstMsgTimeMillis;
+        this.lastMsgTimeMillis = lastMsgTimeMillis;
+        this.brokerAddr = brokerAddr;
+    }
+
+    public LogicalQueueRouteData(LogicalQueueRouteData queueRouteData) {
+        copyFrom(queueRouteData);
+    }
+
+    public int getLogicalQueueIndex() {
+        return logicalQueueIndex;
+    }
+
+    public void setLogicalQueueIndex(int logicalQueueIndex) {
+        this.logicalQueueIndex = logicalQueueIndex;
+    }
+
+    public long getLogicalQueueDelta() {
+        return logicalQueueDelta;
+    }
+
+    public void setLogicalQueueDelta(long logicalQueueDelta) {
+        this.logicalQueueDelta = logicalQueueDelta;
+    }
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+    public MessageQueueRouteState getState() {
+        return state;
+    }
+
+    @JSONField(serialize = false)
+    public int getStateOrdinal() {
+        return state.ordinal();
+    }
+
+    public void setState(MessageQueueRouteState state) {
+        this.state = state;
+    }
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+    public long getOffsetDelta() {
+        return offsetDelta;
+    }
+
+    public void setOffsetDelta(long offsetDelta) {
+        this.offsetDelta = offsetDelta;
+    }
+
+    public long getOffsetMax() {
+        return offsetMax;
+    }
+
+    public void setOffsetMax(long offsetMax) {
+        this.offsetMax = offsetMax;
+    }
+
+    public long getFirstMsgTimeMillis() {
+        return firstMsgTimeMillis;
+    }
+
+    public void setFirstMsgTimeMillis(long firstMsgTimeMillis) {
+        this.firstMsgTimeMillis = firstMsgTimeMillis;
+    }
+
+    public long getLastMsgTimeMillis() {
+        return lastMsgTimeMillis;
+    }
+
+    public void setLastMsgTimeMillis(long lastMsgTimeMillis) {
+        this.lastMsgTimeMillis = lastMsgTimeMillis;
+    }
+
+    @Override public String toString() {
+        return "LogicalQueueRouteData{" +
+            "logicalQueueIndex=" + logicalQueueIndex +
+            ", logicalQueueDelta=" + logicalQueueDelta +
+            ", messageQueue=" + messageQueue +
+            ", state=" + state +
+            ", offsetDelta=" + offsetDelta +
+            ", offsetMax=" + offsetMax +
+            ", firstMsgTimeMillis=" + firstMsgTimeMillis +
+            ", lastMsgTimeMillis=" + lastMsgTimeMillis +
+            ", brokerAddr='" + brokerAddr + '\'' +
+            '}';
+    }
+
+    public void copyFrom(LogicalQueueRouteData queueRouteData) {
+        this.logicalQueueIndex = queueRouteData.logicalQueueIndex;
+        this.logicalQueueDelta = queueRouteData.logicalQueueDelta;
+        this.messageQueue = new MessageQueue(queueRouteData.getMessageQueue());
+        this.state = queueRouteData.state;
+        this.offsetDelta = queueRouteData.offsetDelta;
+        this.offsetMax = queueRouteData.offsetMax;
+        this.firstMsgTimeMillis = queueRouteData.firstMsgTimeMillis;
+        this.lastMsgTimeMillis = queueRouteData.lastMsgTimeMillis;
+        this.brokerAddr = queueRouteData.brokerAddr;
+    }
+
+    public long toLogicalQueueOffset(long messageQueueOffset) {
+        return this.logicalQueueDelta < 0 ? -1 : messageQueueOffset - this.offsetDelta + this.logicalQueueDelta;
+    }
+
+    public long toMessageQueueOffset(long logicalQueueOffset) {
+        return logicalQueueOffset - this.logicalQueueDelta + this.offsetDelta;
+    }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        LogicalQueueRouteData that = (LogicalQueueRouteData) o;
+        return logicalQueueIndex == that.logicalQueueIndex && logicalQueueDelta == that.logicalQueueDelta && offsetDelta == that.offsetDelta && offsetMax == that.offsetMax && firstMsgTimeMillis == that.firstMsgTimeMillis && lastMsgTimeMillis == that.lastMsgTimeMillis && Objects.equal(messageQueue, that.messageQueue) && state == that.state && Objects.equal(brokerAddr, that.brokerAddr);
+    }
+
+    @Override public int hashCode() {
+        return Objects.hashCode(logicalQueueIndex, logicalQueueDelta, messageQueue, state, offsetDelta, offsetMax, firstMsgTimeMillis, lastMsgTimeMillis, brokerAddr);
+    }
+
+    @JSONField(serialize = false)
+    public long getMessagesCount() {
+        return this.offsetDelta >= 0 && this.offsetMax >= 0 ? this.offsetMax - this.offsetDelta : 0L;
+    }
+
+    @JSONField(serialize = false)
+    public boolean isWritable() {
+        return MessageQueueRouteState.Normal.equals(state) || MessageQueueRouteState.WriteOnly.equals(state);
+    }
+
+    @JSONField(serialize = false)
+    public boolean isReadable() {
+        return MessageQueueRouteState.Normal.equals(state) || MessageQueueRouteState.ReadOnly.equals(state);
+    }
+
+    @JSONField(serialize = false)
+    public boolean isExpired() {
+        return MessageQueueRouteState.Expired.equals(state);
+    }
+
+    @JSONField(serialize = false)
+    public boolean isWriteOnly() {
+        return MessageQueueRouteState.WriteOnly.equals(state);
+    }
+
+    @JSONField(serialize = false)
+    public int getQueueId() {
+        return messageQueue.getQueueId();
+    }
+
+    @JSONField(serialize = false)
+    public String getBrokerName() {
+        return messageQueue.getBrokerName();
+    }
+
+    @JSONField(serialize = false)
+    public String getTopic() {
+        return messageQueue.getTopic();
+    }
+
+
+
+    /**
+     * First compare logicalQueueDelta, negative delta must be ordered in the last;
+     * then compare state's ordinal;
+     * then compare messageQueue, nulls first;
+     * then compare offsetDelta.
+     */
+    @Override
+    public int compareTo(LogicalQueueRouteData o) {
+        long x = this.getLogicalQueueDelta();
+        long y = o.getLogicalQueueDelta();
+        int result;
+        {
+            if (x >= 0 && y >= 0) {
+                result = MixAll.compareLong(x, y);
+            } else if (x < 0 && y < 0) {
+                result = MixAll.compareLong(-x, -y);
+            } else if (x < 0) {
+                // o1 < 0 && o2 >= 0
+                result = 1;
+            } else {
+                // o1 >= 0 && o2 < 0
+                result = -1;
+            }
+        }
+        if (result == 0) {
+            result = MixAll.compareInteger(this.state.ordinal(), o.state.ordinal());
+        }
+        if (result == 0) {
+            if (this.messageQueue == null) {
+                if (o.messageQueue != null) {
+                    result = -1;
+                }
+            } else {
+                if (o.messageQueue != null) {
+                    result = this.messageQueue.compareTo(o.messageQueue);
+                } else {
+                    result = 1;
+                }
+            }
+        }
+        if (result == 0) {
+            result = MixAll.compareLong(this.offsetDelta, o.offsetDelta);
+        }
+        return result;
+    }
+
+    public static final Predicate<LogicalQueueRouteData> READABLE_PREDICT = new Predicate<LogicalQueueRouteData>() {
+        @Override public boolean apply(LogicalQueueRouteData input) {
+            return input != null && input.isReadable();
+        }
+    };
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicalQueuesInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicalQueuesInfo.java
new file mode 100644
index 0000000..b1ec584
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicalQueuesInfo.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.route;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.fastjson.parser.ParserConfig;
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Lists;
+import org.apache.rocketmq.common.fastjson.GenericMapSuperclassDeserializer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.base.Optional.fromNullable;
+
+public class LogicalQueuesInfo extends HashMap<Integer, List<LogicalQueueRouteData>> {
+    // TODO whether here needs more fine-grained locks like per logical queue lock?
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    // only be set in broker, will be empty in namesrv
+    private final Map<Integer, LogicalQueueRouteData> queueId2LogicalQueueMap = new ConcurrentHashMap<Integer, LogicalQueueRouteData>();
+
+    public LogicalQueuesInfo() {
+        super();
+    }
+
+    public LogicalQueuesInfo(Map<Integer, List<LogicalQueueRouteData>> m) {
+        super(m);
+    }
+
+    public Lock readLock() {
+        return lock.readLock();
+    }
+
+    public Lock writeLock() {
+        return lock.writeLock();
+    }
+
+    public LogicalQueuesInfo makeDeepCopy() {
+        return this.makeDeepCopy(null);
+    }
+
+    public LogicalQueuesInfo makeDeepCopy(Predicate<LogicalQueueRouteData> predicate) {
+        this.readLock().lock();
+        try {
+            LogicalQueuesInfo copy = new LogicalQueuesInfo();
+            for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : this.entrySet()) {
+                List<LogicalQueueRouteData> list = Lists.newArrayListWithCapacity(entry.getValue().size());
+                for (LogicalQueueRouteData d : entry.getValue()) {
+                    if (predicate != null && !predicate.apply(d)) {
+                        continue;
+                    }
+                    list.add(new LogicalQueueRouteData(d));
+                }
+                copy.put(entry.getKey(), list);
+            }
+            return copy;
+        } finally {
+            this.readLock().unlock();
+        }
+    }
+
+    public void updateQueueRouteDataByQueueId(int queueId, LogicalQueueRouteData queueRouteData) {
+        if (queueRouteData == null) {
+            queueId2LogicalQueueMap.remove(queueId);
+        } else {
+            queueId2LogicalQueueMap.put(queueId, queueRouteData);
+        }
+    }
+
+    /**
+     * find logical queue route data for message queues owned by this broker
+     *
+     * @param queueId
+     * @return
+     */
+    public LogicalQueueRouteData queryQueueRouteDataByQueueId(int queueId) {
+        return queueId2LogicalQueueMap.get(queueId);
+    }
+
+    public void updateLogicalQueueRouteDataList(int logicalQueueIdx,
+        List<LogicalQueueRouteData> logicalQueueRouteDataList) {
+        logicalQueueRouteDataList = new LinkedList<LogicalQueueRouteData>(logicalQueueRouteDataList);
+        this.writeLock().lock();
+        try {
+            List<LogicalQueueRouteData> queueRouteDataList = this.get(logicalQueueIdx);
+            for (LogicalQueueRouteData logicalQueueRouteData : queueRouteDataList) {
+                for (Iterator<LogicalQueueRouteData> it = logicalQueueRouteDataList.iterator(); it.hasNext(); ) {
+                    LogicalQueueRouteData newQueueRouteData = it.next();
+                    if (Objects.equal(newQueueRouteData.getMessageQueue(), logicalQueueRouteData.getMessageQueue())) {
+                        logicalQueueRouteData.copyFrom(newQueueRouteData);
+                        it.remove();
+                        break;
+                    }
+                }
+                if (logicalQueueRouteDataList.isEmpty()) {
+                    break;
+                }
+            }
+            for (LogicalQueueRouteData queueRouteData : logicalQueueRouteDataList) {
+                int idx = Collections.binarySearch(queueRouteDataList, queueRouteData);
+                if (idx < 0) {
+                    idx = -idx - 1;
+                }
+                queueRouteDataList.add(idx, queueRouteData);
+            }
+        } finally {
+            this.writeLock().unlock();
+        }
+    }
+
+    @JSONField(serialize = false)
+    public Collection<LogicalQueueRouteData> getAllOwnedLogicalQueueRouteData() {
+        return queueId2LogicalQueueMap.values();
+    }
+
+    public LogicalQueueRouteData nextAvailableLogicalRouteData(LogicalQueueRouteData queueRouteData,
+        Predicate<LogicalQueueRouteData> predicate) {
+        this.readLock().lock();
+        try {
+            List<LogicalQueueRouteData> queueRouteDataList = fromNullable(this.get(queueRouteData.getLogicalQueueIndex())).or(Collections.<LogicalQueueRouteData>emptyList());
+            int idx = Collections.binarySearch(queueRouteDataList, queueRouteData);
+            if (idx >= 0) {
+                for (int i = idx + 1, size = queueRouteDataList.size(); i < size; i++) {
+                    LogicalQueueRouteData tmp = queueRouteDataList.get(i);
+                    if (predicate.apply(tmp)) {
+                        return tmp;
+                    }
+                }
+            }
+        } finally {
+            this.readLock().unlock();
+        }
+        return null;
+    }
+
+    static {
+        // workaround https://github.com/alibaba/fastjson/issues/3730
+        ParserConfig.getGlobalInstance().putDeserializer(LogicalQueuesInfo.class, GenericMapSuperclassDeserializer.INSTANCE);
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicalQueuesInfoUnordered.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicalQueuesInfoUnordered.java
new file mode 100644
index 0000000..73d6061
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicalQueuesInfoUnordered.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.route;
+
+import com.alibaba.fastjson.parser.ParserConfig;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import org.apache.rocketmq.common.fastjson.GenericMapSuperclassDeserializer;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Only used inside namesrv, between client and namesrv, to reduce cpu usage of namesrv
+ */
+public class LogicalQueuesInfoUnordered extends ConcurrentHashMap<Integer, Map<LogicalQueuesInfoUnordered.Key, LogicalQueueRouteData>> {
+    static {
+        // workaround https://github.com/alibaba/fastjson/issues/3730
+        ParserConfig.getGlobalInstance().putDeserializer(LogicalQueuesInfoUnordered.class, GenericMapSuperclassDeserializer.INSTANCE);
+    }
+
+    public LogicalQueuesInfoUnordered() {
+        super();
+    }
+
+    public LogicalQueuesInfoUnordered(int size) {
+        super(size);
+    }
+
+    public LogicalQueuesInfo toLogicalQueuesInfoOrdered() {
+        LogicalQueuesInfo logicalQueuesInfoOrdered = new LogicalQueuesInfo();
+        for (Map.Entry<Integer, Map<Key, LogicalQueueRouteData>> entry : this.entrySet()) {
+            List<LogicalQueueRouteData> list = Lists.newArrayListWithExpectedSize(entry.getValue().size());
+            for (LogicalQueueRouteData d : entry.getValue().values()) {
+                list.add(new LogicalQueueRouteData(d));
+            }
+            Collections.sort(list);
+            logicalQueuesInfoOrdered.put(entry.getKey(), list);
+        }
+        return logicalQueuesInfoOrdered;
+    }
+
+    public static class Key {
+        private final String brokerName;
+        private final int queueId;
+
+        private final long offsetDelta;
+
+        private final int hash;
+
+        public Key(String brokerName, int queueId, long offsetDelta) {
+            this.brokerName = brokerName;
+            this.queueId = queueId;
+            this.offsetDelta = offsetDelta;
+
+            this.hash = Objects.hashCode(brokerName, queueId, this.offsetDelta);
+        }
+
+        public String getBrokerName() {
+            return brokerName;
+        }
+
+        public int getQueueId() {
+            return queueId;
+        }
+
+        public long getOffsetDelta() {
+            return offsetDelta;
+        }
+
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            Key id = (Key) o;
+            return queueId == id.queueId && offsetDelta == id.offsetDelta && Objects.equal(brokerName, id.brokerName);
+        }
+
+        @Override public int hashCode() {
+            return hash;
+        }
+
+        @Override public String toString() {
+            return "Key{" +
+                "brokerName='" + brokerName + '\'' +
+                ", queueId=" + queueId +
+                ", offsetDelta=" + offsetDelta +
+                '}';
+        }
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/MessageQueueRouteState.java
similarity index 54%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/route/MessageQueueRouteState.java
index a2806e6..e6b48fc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/MessageQueueRouteState.java
@@ -14,29 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.common.protocol.route;
 
-/**
- * $Id: GetRouteInfoRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header.namesrv;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class GetRouteInfoRequestHeader implements CommandCustomHeader {
-    @CFNotNull
-    private String topic;
-
-    @Override
-    public void checkFields() throws RemotingCommandException {
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
+public enum MessageQueueRouteState {
+    // do not change below order, since ordinal() is used
+    Expired,
+    ReadOnly,
+    Normal,
+    WriteOnly,
+    ;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
index e8f54b8..4470a2a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
@@ -30,27 +30,32 @@ public class TopicRouteData extends RemotingSerializable {
     private List<QueueData> queueDatas;
     private List<BrokerData> brokerDatas;
     private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
+    private LogicalQueuesInfo logicalQueuesInfo;
 
-    public TopicRouteData cloneTopicRouteData() {
-        TopicRouteData topicRouteData = new TopicRouteData();
-        topicRouteData.setQueueDatas(new ArrayList<QueueData>());
-        topicRouteData.setBrokerDatas(new ArrayList<BrokerData>());
-        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
-        topicRouteData.setOrderTopicConf(this.orderTopicConf);
+    public TopicRouteData() {
+    }
+
+    public TopicRouteData(TopicRouteData topicRouteData) {
+        this.queueDatas = new ArrayList<QueueData>();
+        this.brokerDatas = new ArrayList<BrokerData>();
+        this.filterServerTable = new HashMap<String, List<String>>();
+        this.orderTopicConf = topicRouteData.orderTopicConf;
 
-        if (this.queueDatas != null) {
-            topicRouteData.getQueueDatas().addAll(this.queueDatas);
+        if (topicRouteData.queueDatas != null) {
+            this.queueDatas.addAll(topicRouteData.queueDatas);
         }
 
-        if (this.brokerDatas != null) {
-            topicRouteData.getBrokerDatas().addAll(this.brokerDatas);
+        if (topicRouteData.brokerDatas != null) {
+            this.brokerDatas.addAll(topicRouteData.brokerDatas);
         }
 
-        if (this.filterServerTable != null) {
-            topicRouteData.getFilterServerTable().putAll(this.filterServerTable);
+        if (topicRouteData.filterServerTable != null) {
+            this.filterServerTable.putAll(topicRouteData.filterServerTable);
         }
 
-        return topicRouteData;
+        if (topicRouteData.logicalQueuesInfo != null) {
+            this.logicalQueuesInfo = new LogicalQueuesInfo(topicRouteData.logicalQueuesInfo);
+        }
     }
 
     public List<QueueData> getQueueDatas() {
@@ -85,6 +90,14 @@ public class TopicRouteData extends RemotingSerializable {
         this.orderTopicConf = orderTopicConf;
     }
 
+    public LogicalQueuesInfo getLogicalQueuesInfo() {
+        return logicalQueuesInfo;
+    }
+
+    public void setLogicalQueuesInfo(LogicalQueuesInfo logicalQueuesInfo) {
+        this.logicalQueuesInfo = logicalQueuesInfo;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -93,6 +106,7 @@ public class TopicRouteData extends RemotingSerializable {
         result = prime * result + ((orderTopicConf == null) ? 0 : orderTopicConf.hashCode());
         result = prime * result + ((queueDatas == null) ? 0 : queueDatas.hashCode());
         result = prime * result + ((filterServerTable == null) ? 0 : filterServerTable.hashCode());
+        result = prime * result + ((logicalQueuesInfo == null) ? 0 : logicalQueuesInfo.hashCode());
         return result;
     }
 
@@ -125,12 +139,17 @@ public class TopicRouteData extends RemotingSerializable {
                 return false;
         } else if (!filterServerTable.equals(other.filterServerTable))
             return false;
+        if (logicalQueuesInfo == null) {
+            if (other.logicalQueuesInfo != null)
+                return false;
+        } else if (!logicalQueuesInfo.equals(other.logicalQueuesInfo))
+            return false;
         return true;
     }
 
     @Override
     public String toString() {
         return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", queueDatas=" + queueDatas
-            + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + "]";
+            + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + ", logicalQueuesInfo=" + logicalQueuesInfo + "]";
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java
new file mode 100644
index 0000000..e9fb84e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.route;
+
+import com.google.common.base.Objects;
+
+public class TopicRouteDataNameSrv extends TopicRouteData {
+    private LogicalQueuesInfoUnordered logicalQueuesInfoUnordered;
+
+    public TopicRouteDataNameSrv() {
+    }
+
+    public LogicalQueuesInfoUnordered getLogicalQueuesInfoUnordered() {
+        return logicalQueuesInfoUnordered;
+    }
+
+    public void setLogicalQueuesInfoUnordered(
+        LogicalQueuesInfoUnordered logicalQueuesInfoUnordered) {
+        this.logicalQueuesInfoUnordered = logicalQueuesInfoUnordered;
+    }
+
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        if (!super.equals(o))
+            return false;
+        TopicRouteDataNameSrv srv = (TopicRouteDataNameSrv) o;
+        return Objects.equal(logicalQueuesInfoUnordered, srv.logicalQueuesInfoUnordered);
+    }
+
+    @Override public int hashCode() {
+        return Objects.hashCode(super.hashCode(), logicalQueuesInfoUnordered);
+    }
+
+    @Override public String toString() {
+        return "TopicRouteDataNameSrv{" +
+            "logicalQueuesInfoUnordered=" + logicalQueuesInfoUnordered +
+            "} " + super.toString();
+    }
+
+    public TopicRouteData toTopicRouteData() {
+        TopicRouteData topicRouteData = new TopicRouteData(this);
+        if (this.logicalQueuesInfoUnordered != null) {
+            topicRouteData.setLogicalQueuesInfo(this.logicalQueuesInfoUnordered.toLogicalQueuesInfoOrdered());
+        }
+        return topicRouteData;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
index d534571..9f39f48 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
@@ -25,6 +25,7 @@ public class MessageSysFlag {
     public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
     public final static int BORNHOST_V6_FLAG = 0x1 << 4;
     public final static int STOREHOSTADDRESS_V6_FLAG = 0x1 << 5;
+    public final static int LOGICAL_QUEUE_FLAG = 0x1 << 6;
 
     public static int getTransactionValue(final int flag) {
         return flag & TRANSACTION_ROLLBACK_TYPE;
diff --git a/common/src/test/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializerTest.java b/common/src/test/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializerTest.java
new file mode 100644
index 0000000..5aa5501
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.fastjson;
+
+import com.alibaba.fastjson.JSON;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
+import org.assertj.core.util.Lists;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class GenericMapSuperclassDeserializerTest {
+    @Test
+    public void testLogicalQueuesInfo() throws Exception {
+        LogicalQueuesInfo logicalQueuesInfo = new LogicalQueuesInfo();
+        logicalQueuesInfo.put(0, Lists.newArrayList(new LogicalQueueRouteData(1, 2, new MessageQueue("topic", "broker", 3), MessageQueueRouteState.Normal, 4, 5, 6, 7, "127.1.2.3")));
+
+        byte[] buf = JSON.toJSONBytes(logicalQueuesInfo);
+
+        LogicalQueuesInfo newLogicalQueuesInfo = JSON.parseObject(buf, LogicalQueuesInfo.class);
+
+        assertThat(newLogicalQueuesInfo).isEqualTo(logicalQueuesInfo);
+    }
+
+    @Test
+    public void testLogicalQueuesInfoUnordered() throws Exception {
+        LogicalQueuesInfoUnordered logicalQueuesInfoUnordered = new LogicalQueuesInfoUnordered();
+        MessageQueue mq = new MessageQueue("topic", "broker", 3);
+        logicalQueuesInfoUnordered.put(0, new ConcurrentHashMap<LogicalQueuesInfoUnordered.Key, LogicalQueueRouteData>(Collections.singletonMap(new LogicalQueuesInfoUnordered.Key(mq.getBrokerName(), mq.getQueueId(), 4), new LogicalQueueRouteData(1, 2, mq, MessageQueueRouteState.Normal, 4, 5, 6, 7, "127.1.2.3"))));
+
+        byte[] buf = JSON.toJSONBytes(logicalQueuesInfoUnordered);
+
+        LogicalQueuesInfoUnordered newLogicalQueuesInfoUnordered = JSON.parseObject(buf, LogicalQueuesInfoUnordered.class);
+
+        assertThat(newLogicalQueuesInfoUnordered).isEqualTo(logicalQueuesInfoUnordered);
+    }
+}
\ No newline at end of file
diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataTest.java
index 5c7c6d1..f72f8f4 100644
--- a/common/src/test/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataTest.java
@@ -18,18 +18,13 @@
 package org.apache.rocketmq.common.protocol.route;
 
 
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-import org.junit.Test;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.within;
 
 
 public class TopicRouteDataTest {
@@ -64,7 +59,7 @@ public class TopicRouteDataTest {
         topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
         topicRouteData.setQueueDatas(queueDataList);
 
-        assertThat(topicRouteData.cloneTopicRouteData()).isEqualTo(topicRouteData);
+        assertThat(new TopicRouteData(topicRouteData)).isEqualTo(topicRouteData);
 
     }
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
index f80ff14..46f4797 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
@@ -18,9 +18,10 @@ package org.apache.rocketmq.remoting.protocol;
 
 import com.alibaba.fastjson.JSON;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 
 public abstract class RemotingSerializable {
-    private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+    public final static Charset CHARSET_UTF8 = StandardCharsets.UTF_8;
 
     public static byte[] encode(final Object obj) {
         final String json = toJson(obj, false);
@@ -35,14 +36,17 @@ public abstract class RemotingSerializable {
     }
 
     public static <T> T decode(final byte[] data, Class<T> classOfT) {
-        final String json = new String(data, CHARSET_UTF8);
-        return fromJson(json, classOfT);
+        return fromJson(data, classOfT);
     }
 
     public static <T> T fromJson(String json, Class<T> classOfT) {
         return JSON.parseObject(json, classOfT);
     }
 
+    private static <T> T fromJson(byte[] data, Class<T> classOfT) {
+        return JSON.parseObject(data, classOfT);
+    }
+
     public byte[] encode() {
         final String json = this.toJson();
         if (json != null) {