You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/15 02:32:29 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Abstract the rpc layer
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 912613d Abstract the rpc layer
912613d is described below
commit 912613d2d3352ea0069b14250bbbabe58b15107e
Author: dongeforever <do...@apache.org>
AuthorDate: Mon Nov 15 10:32:11 2021 +0800
Abstract the rpc layer
---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 100 +---------
.../broker/processor/AdminBrokerProcessor.java | 4 +-
.../broker/processor/PullMessageProcessor.java | 8 +-
.../broker/processor/SendMessageProcessor.java | 2 -
.../broker/topic/TopicQueueMappingManager.java | 5 +-
.../client/impl/factory/MQClientInstance.java | 34 +---
.../rocketmq/common/protocol/ResponseCode.java | 7 +
.../GetEarliestMsgStoretimeRequestHeader.java | 2 +-
.../protocol/header/GetMaxOffsetRequestHeader.java | 2 +-
.../protocol/header/GetMinOffsetRequestHeader.java | 2 +-
.../protocol/header/PullMessageRequestHeader.java | 2 +-
.../header/QueryConsumerOffsetRequestHeader.java | 2 +-
.../protocol/header/SearchOffsetRequestHeader.java | 2 +-
.../protocol/header/SendMessageRequestHeader.java | 2 +-
.../header/UpdateConsumerOffsetRequestHeader.java | 2 +-
.../apache/rocketmq/common/rpc/ClientMetadata.java | 86 ++++++++
.../rocketmq/common/rpc/CommonRpcHeader.java | 63 ++++++
.../org/apache/rocketmq/common/rpc/RpcClient.java | 19 ++
.../apache/rocketmq/common/rpc/RpcClientHook.java | 13 ++
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 218 +++++++++++++++++++++
.../apache/rocketmq/common/rpc/RpcClientUtils.java | 39 ++++
.../apache/rocketmq/common/rpc/RpcException.java | 24 +++
.../apache/rocketmq/common/rpc}/RpcRequest.java | 12 +-
.../apache/rocketmq/common/rpc}/RpcResponse.java | 24 ++-
.../common/rpc/TopicQueueRequestHeader.java | 13 +-
.../rocketmq/remoting/TopicQueueRequestHeader.java | 25 ---
.../remoting/protocol/RemotingCommand.java | 18 +-
27 files changed, 528 insertions(+), 202 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 5bb499e..cd8a179 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -26,9 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
-import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -45,15 +43,10 @@ import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
-import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
-import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
-import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
@@ -66,8 +59,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.RpcRequest;
-import org.apache.rocketmq.remoting.RpcResponse;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -474,95 +467,6 @@ public class BrokerOuterAPI {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
}
- private String getBrokerAddrByNameOrException(String bname) throws MQBrokerException {
- String addr = this.brokerController.getBrokerAddrByName(bname);
- if (addr == null) {
- throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr);
- }
- return addr;
- }
-
- public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
- String addr = getBrokerAddrByNameOrException(bname);
- RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
- RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
- assert responseCommand != null;
-
- switch (responseCommand.getCode()) {
- case ResponseCode.SUCCESS:
- case ResponseCode.PULL_NOT_FOUND:
- case ResponseCode.PULL_RETRY_IMMEDIATELY:
- case ResponseCode.PULL_OFFSET_MOVED:
- PullMessageResponseHeader responseHeader =
- (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
- return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
- default:
- RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
- rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
- return rpcResponse;
- }
- }
-
- public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
- String addr = getBrokerAddrByNameOrException(bname);
- RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
- RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
- assert responseCommand != null;
- switch (responseCommand.getCode()) {
- case ResponseCode.SUCCESS: {
- SearchOffsetResponseHeader responseHeader =
- (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
- return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
- }
- default:{
- RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
- rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
- return rpcResponse;
- }
- }
- }
-
- public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
- String addr = getBrokerAddrByNameOrException(bname);
-
- RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
- RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
- assert responseCommand != null;
- switch (responseCommand.getCode()) {
- case ResponseCode.SUCCESS: {
- GetMinOffsetResponseHeader responseHeader =
- (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
- return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
- }
- default:{
- RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
- rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
- return rpcResponse;
- }
- }
- }
-
- public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
- String addr = getBrokerAddrByNameOrException(bname);
-
- RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
-
- RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
- assert responseCommand != null;
- switch (responseCommand.getCode()) {
- case ResponseCode.SUCCESS: {
- GetEarliestMsgStoretimeResponseHeader responseHeader =
- (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
- return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
-
- }
- default:{
- RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
- rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
- return rpcResponse;
- }
- }
- }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index ee7b31c..9f525f6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -119,8 +119,8 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.RpcRequest;
-import org.apache.rocketmq.remoting.RpcResponse;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 3f2a682..d8cd179 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -36,7 +36,6 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
-import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -60,14 +59,15 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
+import org.apache.rocketmq.common.rpc.RpcClientUtils;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.RpcRequest;
-import org.apache.rocketmq.remoting.RpcResponse;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -157,7 +157,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return rewriteResult;
}
}
- return RemotingCommand.createCommandForRpcResponse(rpcResponse);
+ return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index baa1024..a2d2ace 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -54,7 +53,6 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index ae2e75d..ebcefe8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -28,14 +28,11 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
-import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 950eae7..7818de9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -87,6 +87,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import static org.apache.rocketmq.common.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic;
+
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final static InternalLogger log = ClientLogger.getLog();
@@ -162,37 +164,7 @@ public class MQClientInstance {
}
- public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
- if (route.getTopicQueueMappingByBroker() == null
- || route.getTopicQueueMappingByBroker().isEmpty()) {
- return new ConcurrentHashMap<>();
- }
- ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<>();
- int totalNums = 0;
- for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
- String brokerName = entry.getKey();
- if (entry.getValue().getTotalQueues() > totalNums) {
- if (totalNums != 0) {
- log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
- }
- totalNums = entry.getValue().getTotalQueues();
- }
- for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
- int globalId = idEntry.getKey();
- MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
- String oldBrokerName = mqEndPoints.put(mq, brokerName);
- log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
- }
- }
- //accomplish the static logic queues
- for (int i = 0; i < totalNums; i++) {
- MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
- if (!mqEndPoints.containsKey(mq)) {
- mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
- }
- }
- return mqEndPoints;
- }
+
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index 3944c18..3f691b5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -86,4 +86,11 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int POLLING_TIMEOUT = 210;
public static final int NOT_LEADER_FOR_QUEUE = 501;
+
+ public static final int RPC_UNKNOWN = -1000;
+ public static final int RPC_ADDR_IS_NULL = -1002;
+ public static final int RPC_SEND_TO_CHANNEL_FAILED = -1004;
+ public static final int RPC_TIME_OUT = -1006;
+
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
index fea1736..acf4497 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index e4226c2..2a577d7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
index 6889ae8..70189b7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 5407964..e15170f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
index e4e132e..b7714da 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
index e3a4b1d..c8291d2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 0ec9795..808bc2d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
index e17fe36..11eccd5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
new file mode 100644
index 0000000..23fbc6f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -0,0 +1,86 @@
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicQueueMappingInfo;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class ClientMetadata {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
+ private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
+ private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<String, ConcurrentMap<MessageQueue, String>>();
+ private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
+ new ConcurrentHashMap<String, HashMap<Long, String>>();
+ private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
+ new ConcurrentHashMap<String, HashMap<String, Integer>>();
+
+
+ public String getBrokerNameFromMessageQueue(final MessageQueue mq) {
+ if (topicEndPointsTable.get(mq.getTopic()) != null
+ && !topicEndPointsTable.get(mq.getTopic()).isEmpty()) {
+ return topicEndPointsTable.get(mq.getTopic()).get(mq);
+ }
+ return mq.getBrokerName();
+ }
+
+ public void refreshClusterInfo(ClusterInfo clusterInfo) {
+ if (clusterInfo == null
+ || clusterInfo.getBrokerAddrTable() == null) {
+ return;
+ }
+ for (Map.Entry<String, BrokerData> entry : clusterInfo.getBrokerAddrTable().entrySet()) {
+ brokerAddrTable.put(entry.getKey(), entry.getValue().getBrokerAddrs());
+ }
+ }
+
+ public String findMasterBrokerAddr(String brokerName) {
+ if (!brokerAddrTable.containsKey(brokerName)) {
+ return null;
+ }
+ return brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID);
+ }
+
+ public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
+ if (route.getTopicQueueMappingByBroker() == null
+ || route.getTopicQueueMappingByBroker().isEmpty()) {
+ return new ConcurrentHashMap<MessageQueue, String>();
+ }
+ ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<MessageQueue, String>();
+ int totalNums = 0;
+ for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
+ String brokerName = entry.getKey();
+ if (entry.getValue().getTotalQueues() > totalNums) {
+ if (totalNums != 0) {
+ log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
+ }
+ totalNums = entry.getValue().getTotalQueues();
+ }
+ for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
+ int globalId = idEntry.getKey();
+ MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
+ String oldBrokerName = mqEndPoints.put(mq, brokerName);
+ log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
+ }
+ }
+ //accomplish the static logic queues
+ for (int i = 0; i < totalNums; i++) {
+ MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
+ if (!mqEndPoints.containsKey(mq)) {
+ mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
+ }
+ }
+ return mqEndPoints;
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
new file mode 100644
index 0000000..bf58a57
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+
+public abstract class CommonRpcHeader implements CommandCustomHeader {
+ //the namespace name
+ protected String namespace;
+ //if the data has been namespaced
+ protected Boolean namespaced;
+
+ //the abstract remote addr name, usually the physical broker name
+ protected String bname;
+
+ protected Boolean oneway;
+
+ public String getBname() {
+ return bname;
+ }
+
+ public void setBname(String bname) {
+ this.bname = bname;
+ }
+
+ public Boolean getOneway() {
+ return oneway;
+ }
+
+ public void setOneway(Boolean oneway) {
+ this.oneway = oneway;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public Boolean getNamespaced() {
+ return namespaced;
+ }
+
+ public void setNamespaced(Boolean namespaced) {
+ this.namespaced = namespaced;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java
new file mode 100644
index 0000000..e4de3e0
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java
@@ -0,0 +1,19 @@
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.concurrent.Future;
+
+public interface RpcClient {
+
+
+ //common invoke paradigm, the logic remote addr is defined in "bname" field of request
+ //For oneway request, the sign is labeled in request, and do not need an another method named "invokeOneway"
+ //For one
+ Future<RpcResponse> invoke(RpcRequest request, long timeoutMs) throws RpcException;
+
+ //For rocketmq, most requests are corresponded to MessageQueue
+ //And for LogicQueue, the broker name is mocked, the physical addr could only be defined by MessageQueue
+ Future<RpcResponse> invoke(MessageQueue mq, RpcRequest request, long timeoutMs) throws RpcException;
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java
new file mode 100644
index 0000000..ca0f2d4
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java
@@ -0,0 +1,13 @@
+package org.apache.rocketmq.common.rpc;
+
+import java.util.concurrent.Future;
+
+public abstract class RpcClientHook {
+
+ //if the return is not null, return it
+ public abstract RpcResponse beforeRequest(RpcRequest rpcRequest) throws RpcException;
+
+ //if the return is not null, return it
+ public abstract RpcResponse afterResponse(RpcResponse rpcResponse) throws RpcException;
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
new file mode 100644
index 0000000..81d5d04
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -0,0 +1,218 @@
+package org.apache.rocketmq.common.rpc;
+
+import com.google.common.util.concurrent.Futures;
+import com.sun.org.apache.xpath.internal.functions.FuncPosition;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.Promise;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+public class RpcClientImpl implements RpcClient {
+
+ private ClientMetadata clientMetadata;
+
+ private RemotingClient remotingClient;
+
+ private List<RpcClientHook> clientHookList = new ArrayList<RpcClientHook>();
+
+ public RpcClientImpl(ClientMetadata clientMetadata, RemotingClient remotingClient) {
+ this.clientMetadata = clientMetadata;
+ this.remotingClient = remotingClient;
+ }
+
+ public void registerHook(RpcClientHook hook) {
+ clientHookList.add(hook);
+ }
+
+ @Override
+ public Future<RpcResponse> invoke(MessageQueue mq, RpcRequest request, long timeoutMs) throws RpcException {
+ String bname = clientMetadata.getBrokerNameFromMessageQueue(mq);
+ request.getHeader().setBname(bname);
+ return invoke(request, timeoutMs);
+ }
+
+
+ public Promise<RpcResponse> createResponseFuture() {
+ return ImmediateEventExecutor.INSTANCE.newPromise();
+ }
+
+ @Override
+ public Future<RpcResponse> invoke(RpcRequest request, long timeoutMs) throws RpcException {
+ if (clientHookList.size() > 0) {
+ for (RpcClientHook rpcClientHook: clientHookList) {
+ RpcResponse response = rpcClientHook.beforeRequest(request);
+ if (response != null) {
+ //For 1.6, there is not easy-to-use future impl
+ return createResponseFuture().setSuccess(response);
+ }
+ }
+ }
+ String addr = getBrokerAddrByNameOrException(request.getHeader().bname);
+ Promise<RpcResponse> rpcResponsePromise = null;
+ try {
+ switch (request.getCode()) {
+ case RequestCode.PULL_MESSAGE:
+ rpcResponsePromise = handlePullMessage(addr, request, timeoutMs);
+ break;
+ default:
+ throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
+ }
+ } catch (RpcException rpcException) {
+ throw rpcException;
+ } catch (Exception e) {
+ throw new RpcException(ResponseCode.RPC_UNKNOWN, "error from remoting layer", e);
+ }
+ return rpcResponsePromise;
+ }
+
+
+ private String getBrokerAddrByNameOrException(String bname) throws RpcException {
+ String addr = this.clientMetadata.findMasterBrokerAddr(bname);
+ if (addr == null) {
+ throw new RpcException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname);
+ }
+ return addr;
+ }
+
+
+ private void processFailedResponse(String addr, RemotingCommand requestCommand, ResponseFuture responseFuture, Promise<RpcResponse> rpcResponsePromise) {
+ RemotingCommand responseCommand = responseFuture.getResponseCommand();
+ if (responseCommand != null) {
+ //this should not happen
+ return;
+ }
+ int errorCode = ResponseCode.RPC_UNKNOWN;
+ String errorMessage = null;
+ if (!responseFuture.isSendRequestOK()) {
+ errorCode = ResponseCode.RPC_SEND_TO_CHANNEL_FAILED;
+ errorMessage = "send request failed to " + addr + ". Request: " + requestCommand;
+ } else if (responseFuture.isTimeout()) {
+ errorCode = ResponseCode.RPC_TIME_OUT;
+ errorMessage = "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + requestCommand;
+ } else {
+ errorMessage = "unknown reason. addr: " + addr + ", timeoutMillis: " + responseFuture.getTimeoutMillis() + ". Request: " + requestCommand;
+ }
+ rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(errorCode, errorMessage)));
+ }
+
+
+ public Promise<RpcResponse> handlePullMessage(final String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+ final RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+
+ final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+ InvokeCallback callback = new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ RemotingCommand responseCommand = responseFuture.getResponseCommand();
+ if (responseCommand == null) {
+ processFailedResponse(addr, requestCommand, responseFuture, rpcResponsePromise);
+ return;
+ }
+ try {
+ switch (responseCommand.getCode()) {
+ case ResponseCode.SUCCESS:
+ case ResponseCode.PULL_NOT_FOUND:
+ case ResponseCode.PULL_RETRY_IMMEDIATELY:
+ case ResponseCode.PULL_OFFSET_MOVED:
+ PullMessageResponseHeader responseHeader =
+ (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+ rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+ default:
+ RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code"));
+ rpcResponsePromise.setSuccess(rpcResponse);
+
+ }
+ } catch (Exception e) {
+ String errorMessage = "process failed. addr: " + addr + ", timeoutMillis: " + responseFuture.getTimeoutMillis() + ". Request: " + requestCommand;
+ RpcResponse rpcResponse = new RpcResponse(new RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, e));
+ rpcResponsePromise.setSuccess(rpcResponse);
+ }
+ }
+ };
+
+ this.remotingClient.invokeAsync(addr, requestCommand, timeoutMillis, callback);
+ return rpcResponsePromise;
+ }
+
+ public RpcResponse handleSearchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+ String addr = getBrokerAddrByNameOrException(bname);
+ RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+ RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+ assert responseCommand != null;
+ switch (responseCommand.getCode()) {
+ case ResponseCode.SUCCESS: {
+ SearchOffsetResponseHeader responseHeader =
+ (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+ return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+ }
+ default:{
+ RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
+ rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+ return rpcResponse;
+ }
+ }
+ }
+
+ public RpcResponse handleGetMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+ String addr = getBrokerAddrByNameOrException(bname);
+
+ RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+
+ RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+ assert responseCommand != null;
+ switch (responseCommand.getCode()) {
+ case ResponseCode.SUCCESS: {
+ GetMinOffsetResponseHeader responseHeader =
+ (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+ return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+ }
+ default:{
+ RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
+ rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+ return rpcResponse;
+ }
+ }
+ }
+
+ public RpcResponse handleGetEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+ String addr = getBrokerAddrByNameOrException(bname);
+
+ RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+
+ RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+ assert responseCommand != null;
+ switch (responseCommand.getCode()) {
+ case ResponseCode.SUCCESS: {
+ GetEarliestMsgStoretimeResponseHeader responseHeader =
+ (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
+ return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
+
+ }
+ default:{
+ RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
+ rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
+ return rpcResponse;
+ }
+ }
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
new file mode 100644
index 0000000..3ae06b7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
@@ -0,0 +1,39 @@
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.nio.ByteBuffer;
+
+public class RpcClientUtils {
+
+ public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
+ RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getCode(), rpcRequest.getHeader());
+ cmd.setBody(encodeBody(rpcRequest.getBody()));
+ return cmd;
+ }
+
+ public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
+ RemotingCommand cmd = RemotingCommand.createResponseCommand(rpcResponse.getCode(), rpcResponse.getHeader());
+ cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
+ cmd.setBody(encodeBody(rpcResponse.getBody()));
+ return cmd;
+ }
+
+ public static byte[] encodeBody(Object body) {
+ if (body instanceof byte[]) {
+ return (byte[])body;
+ } else if (body instanceof RemotingSerializable) {
+ return ((RemotingSerializable) body).encode();
+ } else if (body instanceof ByteBuffer) {
+ ByteBuffer buffer = (ByteBuffer)body;
+ buffer.mark();
+ byte[] data = new byte[buffer.remaining()];
+ buffer.get(data);
+ buffer.reset();
+ return data;
+ } else {
+ throw new RuntimeException("Unsupported body type " + body.getClass());
+ }
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java
new file mode 100644
index 0000000..fc096df
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java
@@ -0,0 +1,24 @@
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class RpcException extends RemotingException {
+ private int errorCode;
+ public RpcException(int errorCode, String message) {
+ super(message);
+ this.errorCode = errorCode;
+ }
+
+ public RpcException(int errorCode, String message, Throwable cause) {
+ super(message, cause);
+ this.errorCode = errorCode;
+ }
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ public void setErrorCode(int errorCode) {
+ this.errorCode = errorCode;
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
similarity index 80%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java
rename to common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
index e8e03d5..e1cf010 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
@@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting;
+package org.apache.rocketmq.common.rpc;
public class RpcRequest {
private int code;
- private CommandCustomHeader header;
- private byte[] body;
+ private CommonRpcHeader header;
+ private Object body;
- public RpcRequest(int code, CommandCustomHeader header, byte[] body) {
+ public RpcRequest(int code, CommonRpcHeader header, Object body) {
this.code = code;
this.header = header;
this.body = body;
@@ -31,11 +31,11 @@ public class RpcRequest {
return code;
}
- public CommandCustomHeader getHeader() {
+ public CommonRpcHeader getHeader() {
return header;
}
- public byte[] getBody() {
+ public Object getBody() {
return body;
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
similarity index 73%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java
rename to common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index 9ae9950..601f942 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -14,13 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting;
+package org.apache.rocketmq.common.rpc;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
public class RpcResponse {
private int code;
private CommandCustomHeader header;
- private byte[] body;
- public Exception exception;
+ private Object body;
+ public RpcException exception;
+
+ public RpcResponse() {
+
+ }
public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
this.code = code;
@@ -28,6 +34,11 @@ public class RpcResponse {
this.body = body;
}
+ public RpcResponse(RpcException rpcException) {
+ this.code = rpcException.getErrorCode();
+ this.exception = rpcException;
+ }
+
public int getCode() {
return code;
}
@@ -36,15 +47,16 @@ public class RpcResponse {
return header;
}
- public byte[] getBody() {
+ public Object getBody() {
return body;
}
- public Exception getException() {
+ public RpcException getException() {
return exception;
}
- public void setException(Exception exception) {
+ public void setException(RpcException exception) {
this.exception = exception;
}
+
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
similarity index 73%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java
rename to common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
index 109fb9e..897dfcb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
@@ -14,16 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting;
+package org.apache.rocketmq.common.rpc;
-public abstract class RequestHeader implements CommandCustomHeader {
+public abstract class TopicQueueRequestHeader extends CommonRpcHeader {
+ //Physical or Logical
protected Boolean physical;
+ @Override
public Boolean getPhysical() {
return physical;
}
+ @Override
public void setPhysical(Boolean physical) {
this.physical = physical;
}
+
+ public abstract String getTopic();
+ public abstract void setTopic(String topic);
+ public abstract Integer getQueueId();
+ public abstract void setQueueId(Integer queueId);
+
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java
deleted file mode 100644
index 08c3fef..0000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.remoting;
-
-public abstract class TopicQueueRequestHeader extends RequestHeader {
- public abstract String getTopic();
- public abstract void setTopic(String topic);
- public abstract Integer getQueueId();
- public abstract void setQueueId(Integer queueId);
-
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index e061180..65ffaf5 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -20,8 +20,6 @@ import com.alibaba.fastjson.annotation.JSONField;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.RpcRequest;
-import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -88,6 +86,7 @@ public class RemotingCommand {
protected RemotingCommand() {
}
+
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
@@ -96,20 +95,11 @@ public class RemotingCommand {
return cmd;
}
- public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
- RemotingCommand cmd = new RemotingCommand();
- cmd.setCode(rpcRequest.getCode());
- cmd.customHeader = rpcRequest.getHeader();
- setCmdVersion(cmd);
- cmd.setBody(rpcRequest.getBody());
- return cmd;
- }
-
- public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
+ public static RemotingCommand createResponseCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
+ cmd.setCode(code);
cmd.markResponseType();
- cmd.setCode(rpcResponse.getCode());
- cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
+ cmd.customHeader = customHeader;
setCmdVersion(cmd);
return cmd;
}