You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/15 02:32:29 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Abstract the rpc layer

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 912613d  Abstract the rpc layer
912613d is described below

commit 912613d2d3352ea0069b14250bbbabe58b15107e
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Nov 15 10:32:11 2021 +0800

    Abstract the rpc layer
---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 100 +---------
 .../broker/processor/AdminBrokerProcessor.java     |   4 +-
 .../broker/processor/PullMessageProcessor.java     |   8 +-
 .../broker/processor/SendMessageProcessor.java     |   2 -
 .../broker/topic/TopicQueueMappingManager.java     |   5 +-
 .../client/impl/factory/MQClientInstance.java      |  34 +---
 .../rocketmq/common/protocol/ResponseCode.java     |   7 +
 .../GetEarliestMsgStoretimeRequestHeader.java      |   2 +-
 .../protocol/header/GetMaxOffsetRequestHeader.java |   2 +-
 .../protocol/header/GetMinOffsetRequestHeader.java |   2 +-
 .../protocol/header/PullMessageRequestHeader.java  |   2 +-
 .../header/QueryConsumerOffsetRequestHeader.java   |   2 +-
 .../protocol/header/SearchOffsetRequestHeader.java |   2 +-
 .../protocol/header/SendMessageRequestHeader.java  |   2 +-
 .../header/UpdateConsumerOffsetRequestHeader.java  |   2 +-
 .../apache/rocketmq/common/rpc/ClientMetadata.java |  86 ++++++++
 .../rocketmq/common/rpc/CommonRpcHeader.java       |  63 ++++++
 .../org/apache/rocketmq/common/rpc/RpcClient.java  |  19 ++
 .../apache/rocketmq/common/rpc/RpcClientHook.java  |  13 ++
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  | 218 +++++++++++++++++++++
 .../apache/rocketmq/common/rpc/RpcClientUtils.java |  39 ++++
 .../apache/rocketmq/common/rpc/RpcException.java   |  24 +++
 .../apache/rocketmq/common/rpc}/RpcRequest.java    |  12 +-
 .../apache/rocketmq/common/rpc}/RpcResponse.java   |  24 ++-
 .../common/rpc/TopicQueueRequestHeader.java        |  13 +-
 .../rocketmq/remoting/TopicQueueRequestHeader.java |  25 ---
 .../remoting/protocol/RemotingCommand.java         |  18 +-
 27 files changed, 528 insertions(+), 202 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 5bb499e..cd8a179 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -26,9 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
-import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -45,15 +43,10 @@ import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
-import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
-import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
-import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
@@ -66,8 +59,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.RpcRequest;
-import org.apache.rocketmq.remoting.RpcResponse;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -474,95 +467,6 @@ public class BrokerOuterAPI {
         this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
     }
 
-    private String getBrokerAddrByNameOrException(String bname) throws MQBrokerException {
-        String addr = this.brokerController.getBrokerAddrByName(bname);
-        if (addr == null) {
-            throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr);
-        }
-        return addr;
-    }
-
-    public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = getBrokerAddrByNameOrException(bname);
-        RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
-        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
-        assert responseCommand != null;
-
-        switch (responseCommand.getCode()) {
-            case ResponseCode.SUCCESS:
-            case ResponseCode.PULL_NOT_FOUND:
-            case ResponseCode.PULL_RETRY_IMMEDIATELY:
-            case ResponseCode.PULL_OFFSET_MOVED:
-                PullMessageResponseHeader responseHeader =
-                        (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
-                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
-            default:
-                RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
-                rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
-                return rpcResponse;
-        }
-    }
-
-    public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = getBrokerAddrByNameOrException(bname);
-        RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
-        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
-        assert responseCommand != null;
-        switch (responseCommand.getCode()) {
-            case ResponseCode.SUCCESS: {
-                SearchOffsetResponseHeader responseHeader =
-                        (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
-                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
-            }
-            default:{
-                RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
-                rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
-                return rpcResponse;
-            }
-        }
-    }
-
-    public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = getBrokerAddrByNameOrException(bname);
-
-        RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
 
-        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
-        assert responseCommand != null;
-        switch (responseCommand.getCode()) {
-            case ResponseCode.SUCCESS: {
-                GetMinOffsetResponseHeader responseHeader =
-                        (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
-                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
-            }
-            default:{
-                RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
-                rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
-                return rpcResponse;
-            }
-        }
-    }
-
-    public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = getBrokerAddrByNameOrException(bname);
-
-        RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
-
-        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
-        assert responseCommand != null;
-        switch (responseCommand.getCode()) {
-            case ResponseCode.SUCCESS: {
-                GetEarliestMsgStoretimeResponseHeader responseHeader =
-                        (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
-                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
-
-            }
-            default:{
-                RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
-                rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
-                return rpcResponse;
-            }
-        }
-    }
 
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index ee7b31c..9f525f6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -119,8 +119,8 @@ import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.filter.util.BitsArray;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.RpcRequest;
-import org.apache.rocketmq.remoting.RpcResponse;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 3f2a682..d8cd179 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -36,7 +36,6 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
-import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.LogicQueueMappingItem;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -60,14 +59,15 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
 import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
 import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
+import org.apache.rocketmq.common.rpc.RpcClientUtils;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.RpcRequest;
-import org.apache.rocketmq.remoting.RpcResponse;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -157,7 +157,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                     return rewriteResult;
                 }
             }
-            return RemotingCommand.createCommandForRpcResponse(rpcResponse);
+            return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
         } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index baa1024..a2d2ace 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.broker.processor;
 
 import java.net.SocketAddress;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -54,7 +53,6 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index ae2e75d..ebcefe8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -28,14 +28,11 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
 import org.apache.rocketmq.common.TopicQueueMappingDetail;
-import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 950eae7..7818de9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -87,6 +87,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import static org.apache.rocketmq.common.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic;
+
 public class MQClientInstance {
     private final static long LOCK_TIMEOUT_MILLIS = 3000;
     private final static InternalLogger log = ClientLogger.getLog();
@@ -162,37 +164,7 @@ public class MQClientInstance {
     }
 
 
-    public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
-        if (route.getTopicQueueMappingByBroker() == null
-            || route.getTopicQueueMappingByBroker().isEmpty()) {
-            return new ConcurrentHashMap<>();
-        }
-        ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<>();
-        int totalNums = 0;
-        for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
-            String brokerName = entry.getKey();
-            if (entry.getValue().getTotalQueues() > totalNums) {
-                if (totalNums != 0) {
-                    log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
-                }
-                totalNums = entry.getValue().getTotalQueues();
-            }
-            for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
-                int globalId = idEntry.getKey();
-                MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
-                String oldBrokerName = mqEndPoints.put(mq, brokerName);
-                log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
-            }
-        }
-        //accomplish the static logic queues
-        for (int i = 0; i < totalNums; i++) {
-            MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
-            if (!mqEndPoints.containsKey(mq)) {
-                mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
-            }
-        }
-        return mqEndPoints;
-    }
+
 
     public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
         TopicPublishInfo info = new TopicPublishInfo();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index 3944c18..3f691b5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -86,4 +86,11 @@ public class ResponseCode extends RemotingSysResponseCode {
     public static final int POLLING_TIMEOUT = 210;
 
     public static final int NOT_LEADER_FOR_QUEUE = 501;
+
+    public static final int RPC_UNKNOWN = -1000;
+    public static final int RPC_ADDR_IS_NULL = -1002;
+    public static final int RPC_SEND_TO_CHANNEL_FAILED = -1004;
+    public static final int RPC_TIME_OUT = -1006;
+
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
index fea1736..acf4497 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
@@ -20,7 +20,7 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index e4226c2..2a577d7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -20,7 +20,7 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
index 6889ae8..70189b7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
@@ -20,7 +20,7 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 5407964..e15170f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,7 +20,7 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
index e4e132e..b7714da 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -20,7 +20,7 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
index e3a4b1d..c8291d2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
@@ -20,7 +20,7 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 0ec9795..808bc2d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -20,7 +20,7 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
index e17fe36..11eccd5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
@@ -20,7 +20,7 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
new file mode 100644
index 0000000..23fbc6f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -0,0 +1,86 @@
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class ClientMetadata {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
+    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
+    private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<String, ConcurrentMap<MessageQueue, String>>();
+    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
+            new ConcurrentHashMap<String, HashMap<Long, String>>();
+    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
+            new ConcurrentHashMap<String, HashMap<String, Integer>>();
+
+
+    public String getBrokerNameFromMessageQueue(final MessageQueue mq) {
+        if (topicEndPointsTable.get(mq.getTopic()) != null
+                && !topicEndPointsTable.get(mq.getTopic()).isEmpty()) {
+            return topicEndPointsTable.get(mq.getTopic()).get(mq);
+        }
+        return mq.getBrokerName();
+    }
+
+    public void refreshClusterInfo(ClusterInfo clusterInfo) {
+        if (clusterInfo == null
+            || clusterInfo.getBrokerAddrTable() == null) {
+            return;
+        }
+        for (Map.Entry<String, BrokerData> entry : clusterInfo.getBrokerAddrTable().entrySet()) {
+            brokerAddrTable.put(entry.getKey(), entry.getValue().getBrokerAddrs());
+        }
+    }
+
+    public String findMasterBrokerAddr(String brokerName) {
+        if (!brokerAddrTable.containsKey(brokerName)) {
+            return null;
+        }
+        return brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID);
+    }
+
+    public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
+        if (route.getTopicQueueMappingByBroker() == null
+                || route.getTopicQueueMappingByBroker().isEmpty()) {
+            return new ConcurrentHashMap<MessageQueue, String>();
+        }
+        ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<MessageQueue, String>();
+        int totalNums = 0;
+        for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
+            String brokerName = entry.getKey();
+            if (entry.getValue().getTotalQueues() > totalNums) {
+                if (totalNums != 0) {
+                    log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
+                }
+                totalNums = entry.getValue().getTotalQueues();
+            }
+            for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
+                int globalId = idEntry.getKey();
+                MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
+                String oldBrokerName = mqEndPoints.put(mq, brokerName);
+                log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
+            }
+        }
+        //accomplish the static logic queues
+        for (int i = 0; i < totalNums; i++) {
+            MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
+            if (!mqEndPoints.containsKey(mq)) {
+                mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
+            }
+        }
+        return mqEndPoints;
+    }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
new file mode 100644
index 0000000..bf58a57
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+
+public abstract class CommonRpcHeader implements CommandCustomHeader {
+    //the namespace name
+    protected String namespace;
+    //if the data has been namespaced
+    protected Boolean namespaced;
+
+    //the abstract remote addr name, usually the physical broker name
+    protected String bname;
+
+    protected Boolean oneway;
+
+    public String getBname() {
+        return bname;
+    }
+
+    public void setBname(String bname) {
+        this.bname = bname;
+    }
+
+    public Boolean getOneway() {
+        return oneway;
+    }
+
+    public void setOneway(Boolean oneway) {
+        this.oneway = oneway;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public Boolean getNamespaced() {
+        return namespaced;
+    }
+
+    public void setNamespaced(Boolean namespaced) {
+        this.namespaced = namespaced;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java
new file mode 100644
index 0000000..e4de3e0
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java
@@ -0,0 +1,19 @@
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.concurrent.Future;
+
+public interface RpcClient {
+
+
+    //common invoke paradigm, the logic remote addr is defined in "bname" field of request
+    //For oneway request, the sign is labeled in request, and do not need an another method named "invokeOneway"
+    //For one
+    Future<RpcResponse>  invoke(RpcRequest request, long timeoutMs) throws RpcException;
+
+    //For rocketmq, most requests are corresponded to MessageQueue
+    //And for LogicQueue, the broker name is mocked, the physical addr could only be defined by MessageQueue
+    Future<RpcResponse>  invoke(MessageQueue mq, RpcRequest request, long timeoutMs) throws RpcException;
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java
new file mode 100644
index 0000000..ca0f2d4
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java
@@ -0,0 +1,13 @@
+package org.apache.rocketmq.common.rpc;
+
+import java.util.concurrent.Future;
+
+public abstract class RpcClientHook {
+
+    //if the return is not null, return it
+    public abstract RpcResponse beforeRequest(RpcRequest rpcRequest) throws RpcException;
+
+    //if the return is not null, return it
+    public abstract RpcResponse afterResponse(RpcResponse rpcResponse) throws RpcException;
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
new file mode 100644
index 0000000..81d5d04
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -0,0 +1,218 @@
+package org.apache.rocketmq.common.rpc;
+
+import com.google.common.util.concurrent.Futures;
+import com.sun.org.apache.xpath.internal.functions.FuncPosition;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.Promise;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+public class RpcClientImpl implements RpcClient {
+
+    private ClientMetadata clientMetadata;
+
+    private RemotingClient remotingClient;
+
+    private List<RpcClientHook> clientHookList = new ArrayList<RpcClientHook>();
+
+    public RpcClientImpl(ClientMetadata clientMetadata, RemotingClient remotingClient) {
+        this.clientMetadata = clientMetadata;
+        this.remotingClient = remotingClient;
+    }
+
+    public void registerHook(RpcClientHook hook) {
+        clientHookList.add(hook);
+    }
+
+    @Override
+    public Future<RpcResponse>  invoke(MessageQueue mq, RpcRequest request, long timeoutMs) throws RpcException {
+        String bname =  clientMetadata.getBrokerNameFromMessageQueue(mq);
+        request.getHeader().setBname(bname);
+        return invoke(request, timeoutMs);
+    }
+
+
+    public Promise<RpcResponse> createResponseFuture()  {
+        return ImmediateEventExecutor.INSTANCE.newPromise();
+    }
+
+    @Override
+    public Future<RpcResponse>  invoke(RpcRequest request, long timeoutMs) throws RpcException {
+        if (clientHookList.size() > 0) {
+            for (RpcClientHook rpcClientHook: clientHookList) {
+                RpcResponse response = rpcClientHook.beforeRequest(request);
+                if (response != null) {
+                    //For 1.6, there is not easy-to-use future impl
+                    return createResponseFuture().setSuccess(response);
+                }
+            }
+        }
+        String addr = getBrokerAddrByNameOrException(request.getHeader().bname);
+        Promise<RpcResponse> rpcResponsePromise = null;
+        try {
+            switch (request.getCode()) {
+                case RequestCode.PULL_MESSAGE:
+                    rpcResponsePromise = handlePullMessage(addr, request, timeoutMs);
+                    break;
+                default:
+                    throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
+            }
+        } catch (RpcException rpcException) {
+            throw rpcException;
+        } catch (Exception e) {
+            throw new RpcException(ResponseCode.RPC_UNKNOWN, "error from remoting layer", e);
+        }
+        return rpcResponsePromise;
+    }
+
+
+    private String getBrokerAddrByNameOrException(String bname) throws RpcException {
+        String addr = this.clientMetadata.findMasterBrokerAddr(bname);
+        if (addr == null) {
+            throw new RpcException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname);
+        }
+        return addr;
+    }
+
+
+    private void processFailedResponse(String addr, RemotingCommand requestCommand,  ResponseFuture responseFuture, Promise<RpcResponse> rpcResponsePromise) {
+        RemotingCommand responseCommand = responseFuture.getResponseCommand();
+        if (responseCommand != null) {
+            //this should not happen
+            return;
+        }
+        int errorCode = ResponseCode.RPC_UNKNOWN;
+        String errorMessage = null;
+        if (!responseFuture.isSendRequestOK()) {
+            errorCode = ResponseCode.RPC_SEND_TO_CHANNEL_FAILED;
+            errorMessage = "send request failed to " + addr + ". Request: " + requestCommand;
+        } else if (responseFuture.isTimeout()) {
+            errorCode = ResponseCode.RPC_TIME_OUT;
+            errorMessage = "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + requestCommand;
+        } else {
+            errorMessage = "unknown reason. addr: " + addr + ", timeoutMillis: " + responseFuture.getTimeoutMillis() + ". Request: " + requestCommand;
+        }
+        rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(errorCode, errorMessage)));
+    }
+
+
+    public Promise<RpcResponse> handlePullMessage(final String addr, RpcRequest rpcRequest, long timeoutMillis)  throws Exception {
+        final RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+
+        final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+        InvokeCallback callback = new InvokeCallback() {
+            @Override
+            public void operationComplete(ResponseFuture responseFuture) {
+                RemotingCommand responseCommand = responseFuture.getResponseCommand();
+                if (responseCommand == null) {
+                    processFailedResponse(addr, requestCommand, responseFuture, rpcResponsePromise);
+                    return;
+                }
+                try {
+                    switch (responseCommand.getCode()) {
+                        case ResponseCode.SUCCESS:
+                        case ResponseCode.PULL_NOT_FOUND:
+                        case ResponseCode.PULL_RETRY_IMMEDIATELY:
+                        case ResponseCode.PULL_OFFSET_MOVED:
+                            PullMessageResponseHeader responseHeader =
+                                    (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+                            rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+                        default:
+                            RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code"));
+                            rpcResponsePromise.setSuccess(rpcResponse);
+
+                    }
+                } catch (Exception e) {
+                    String errorMessage = "process failed. addr: " + addr + ", timeoutMillis: " + responseFuture.getTimeoutMillis() + ". Request: " + requestCommand;
+                    RpcResponse  rpcResponse = new RpcResponse(new RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, e));
+                    rpcResponsePromise.setSuccess(rpcResponse);
+                }
+            }
+        };
+
+        this.remotingClient.invokeAsync(addr, requestCommand, timeoutMillis, callback);
+        return rpcResponsePromise;
+    }
+
+    public RpcResponse handleSearchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        String addr = getBrokerAddrByNameOrException(bname);
+        RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                SearchOffsetResponseHeader responseHeader =
+                        (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+            }
+            default:{
+                RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
+                rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+                return rpcResponse;
+            }
+        }
+    }
+
+    public RpcResponse handleGetMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        String addr = getBrokerAddrByNameOrException(bname);
+
+        RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GetMinOffsetResponseHeader responseHeader =
+                        (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+            }
+            default:{
+                RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
+                rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+                return rpcResponse;
+            }
+        }
+    }
+
+    public RpcResponse handleGetEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        String addr = getBrokerAddrByNameOrException(bname);
+
+        RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GetEarliestMsgStoretimeResponseHeader responseHeader =
+                        (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
+                return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+
+            }
+            default:{
+                RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
+                rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+                return rpcResponse;
+            }
+        }
+    }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
new file mode 100644
index 0000000..3ae06b7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
@@ -0,0 +1,39 @@
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.nio.ByteBuffer;
+
+public class RpcClientUtils {
+
+    public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
+        RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getCode(), rpcRequest.getHeader());
+        cmd.setBody(encodeBody(rpcRequest.getBody()));
+        return cmd;
+    }
+
+    public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
+        RemotingCommand cmd = RemotingCommand.createResponseCommand(rpcResponse.getCode(), rpcResponse.getHeader());
+        cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
+        cmd.setBody(encodeBody(rpcResponse.getBody()));
+        return cmd;
+    }
+
+    public static byte[] encodeBody(Object body) {
+        if (body instanceof byte[]) {
+            return (byte[])body;
+        } else if (body instanceof RemotingSerializable) {
+            return ((RemotingSerializable) body).encode();
+        } else if (body instanceof ByteBuffer) {
+            ByteBuffer buffer = (ByteBuffer)body;
+            buffer.mark();
+            byte[] data = new byte[buffer.remaining()];
+            buffer.get(data);
+            buffer.reset();
+            return data;
+        } else {
+            throw new RuntimeException("Unsupported body type " + body.getClass());
+        }
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java
new file mode 100644
index 0000000..fc096df
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java
@@ -0,0 +1,24 @@
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class RpcException extends RemotingException {
+    private int errorCode;
+    public RpcException(int errorCode, String message) {
+        super(message);
+        this.errorCode = errorCode;
+    }
+
+    public RpcException(int errorCode, String message, Throwable cause) {
+        super(message, cause);
+        this.errorCode = errorCode;
+    }
+
+    public int getErrorCode() {
+        return errorCode;
+    }
+
+    public void setErrorCode(int errorCode) {
+        this.errorCode = errorCode;
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
similarity index 80%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java
rename to common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
index e8e03d5..e1cf010 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
@@ -14,14 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting;
+package org.apache.rocketmq.common.rpc;
 
 public class RpcRequest {
     private int code;
-    private CommandCustomHeader header;
-    private byte[] body;
+    private CommonRpcHeader header;
+    private Object body;
 
-    public RpcRequest(int code, CommandCustomHeader header, byte[] body) {
+    public RpcRequest(int code, CommonRpcHeader header, Object body) {
         this.code = code;
         this.header = header;
         this.body = body;
@@ -31,11 +31,11 @@ public class RpcRequest {
         return code;
     }
 
-    public CommandCustomHeader getHeader() {
+    public CommonRpcHeader getHeader() {
         return header;
     }
 
-    public byte[] getBody() {
+    public Object getBody() {
         return body;
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
similarity index 73%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java
rename to common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index 9ae9950..601f942 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -14,13 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting;
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
 
 public class RpcResponse   {
     private int code;
     private CommandCustomHeader header;
-    private byte[] body;
-    public Exception exception;
+    private Object body;
+    public RpcException exception;
+
+    public RpcResponse() {
+
+    }
 
     public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
         this.code = code;
@@ -28,6 +34,11 @@ public class RpcResponse   {
         this.body = body;
     }
 
+    public RpcResponse(RpcException rpcException) {
+        this.code = rpcException.getErrorCode();
+        this.exception = rpcException;
+    }
+
     public int getCode() {
         return code;
     }
@@ -36,15 +47,16 @@ public class RpcResponse   {
         return header;
     }
 
-    public byte[] getBody() {
+    public Object getBody() {
         return body;
     }
 
-    public Exception getException() {
+    public RpcException getException() {
         return exception;
     }
 
-    public void setException(Exception exception) {
+    public void setException(RpcException exception) {
         this.exception = exception;
     }
+
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
similarity index 73%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java
rename to common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
index 109fb9e..897dfcb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
@@ -14,16 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.remoting;
+package org.apache.rocketmq.common.rpc;
 
-public abstract class RequestHeader implements  CommandCustomHeader {
+public abstract class TopicQueueRequestHeader extends CommonRpcHeader {
+    //Physical or Logical
     protected Boolean physical;
 
+    @Override
     public Boolean getPhysical() {
         return physical;
     }
 
+    @Override
     public void setPhysical(Boolean physical) {
         this.physical = physical;
     }
+
+    public abstract String getTopic();
+    public abstract void setTopic(String topic);
+    public abstract Integer getQueueId();
+    public abstract void setQueueId(Integer queueId);
+
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java
deleted file mode 100644
index 08c3fef..0000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.remoting;
-
-public abstract class TopicQueueRequestHeader extends   RequestHeader  {
-    public abstract String getTopic();
-    public abstract void setTopic(String topic);
-    public abstract Integer getQueueId();
-    public abstract void setQueueId(Integer queueId);
-
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index e061180..65ffaf5 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -20,8 +20,6 @@ import com.alibaba.fastjson.annotation.JSONField;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.RpcRequest;
-import org.apache.rocketmq.remoting.RpcResponse;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -88,6 +86,7 @@ public class RemotingCommand {
     protected RemotingCommand() {
     }
 
+
     public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
         RemotingCommand cmd = new RemotingCommand();
         cmd.setCode(code);
@@ -96,20 +95,11 @@ public class RemotingCommand {
         return cmd;
     }
 
-    public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
-        RemotingCommand cmd = new RemotingCommand();
-        cmd.setCode(rpcRequest.getCode());
-        cmd.customHeader = rpcRequest.getHeader();
-        setCmdVersion(cmd);
-        cmd.setBody(rpcRequest.getBody());
-        return cmd;
-    }
-
-    public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
+    public static RemotingCommand createResponseCommand(int code, CommandCustomHeader customHeader) {
         RemotingCommand cmd = new RemotingCommand();
+        cmd.setCode(code);
         cmd.markResponseType();
-        cmd.setCode(rpcResponse.getCode());
-        cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
+        cmd.customHeader = customHeader;
         setCmdVersion(cmd);
         return cmd;
     }