You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/16 07:32:53 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Correct the invoke for PullMessageProcessor
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new da12c9e Correct the invoke for PullMessageProcessor
da12c9e is described below
commit da12c9ed6056fc4515b99e1f86d4ac62797463a7
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 16 15:32:43 2021 +0800
Correct the invoke for PullMessageProcessor
---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 19 +++++++++++++++++--
.../broker/processor/PullMessageProcessor.java | 6 ++++--
.../apache/rocketmq/common/rpc/CommonRpcHeader.java | 10 ++++++++++
.../apache/rocketmq/common/rpc/RequestBuilder.java | 2 ++
.../org/apache/rocketmq/common/rpc/RpcRequest.java | 8 +-------
5 files changed, 34 insertions(+), 11 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index cd8a179..384eef0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -54,6 +54,9 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestH
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.rpc.RpcClient;
+import org.apache.rocketmq.common.rpc.RpcClientImpl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
@@ -81,15 +84,21 @@ public class BrokerOuterAPI {
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
+ private ClientMetadata clientMetadata;
+ private RpcClient rpcClient;
+
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, final BrokerController brokerController) {
- this(nettyClientConfig, null, brokerController);
+
+ this(nettyClientConfig, null, brokerController, new ClientMetadata());
}
- public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook, final BrokerController brokerController) {
+ private BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook, final BrokerController brokerController, ClientMetadata clientMetadata) {
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
+ this.clientMetadata = clientMetadata;
this.remotingClient.registerRPCHook(rpcHook);
this.brokerController = brokerController;
this.currBrokerName = brokerController.getBrokerConfig().getBrokerName();
+ this.rpcClient = new RpcClientImpl(this.clientMetadata, this.remotingClient);
}
public void start() {
@@ -468,5 +477,11 @@ public class BrokerOuterAPI {
}
+ public ClientMetadata getClientMetadata() {
+ return clientMetadata;
+ }
+ public RpcClient getRpcClient() {
+ return rpcClient;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index d8cd179..0cd4668 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -144,8 +144,10 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
}
requestHeader.setPhysical(true);
- RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null);
- RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(bname, rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+ requestHeader.setBname(bname);
+ requestHeader.setCode(RequestCode.PULL_MESSAGE);
+ RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
index bf58a57..df62d52 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
@@ -24,6 +24,8 @@ public abstract class CommonRpcHeader implements CommandCustomHeader {
//if the data has been namespaced
protected Boolean namespaced;
+ protected int code;
+
//the abstract remote addr name, usually the physical broker name
protected String bname;
@@ -60,4 +62,12 @@ public abstract class CommonRpcHeader implements CommandCustomHeader {
public void setNamespaced(Boolean namespaced) {
this.namespaced = namespaced;
}
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index aa69b17..7655871 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -25,6 +25,7 @@ public class RequestBuilder {
}
try {
CommonRpcHeader requestHeader = (CommonRpcHeader) requestHeaderClass.newInstance();
+ requestHeader.setCode(requestCode);
requestHeader.setOneway(oneway);
requestHeader.setBname(destBrokerName);
return requestHeader;
@@ -52,6 +53,7 @@ public class RequestBuilder {
}
try {
TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance();
+ requestHeader.setCode(requestCode);
requestHeader.setOneway(oneway);
requestHeader.setBname(destBrokerName);
requestHeader.setTopic(topic);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
index e1cf010..06d5c79 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
@@ -17,20 +17,14 @@
package org.apache.rocketmq.common.rpc;
public class RpcRequest {
- private int code;
private CommonRpcHeader header;
private Object body;
- public RpcRequest(int code, CommonRpcHeader header, Object body) {
- this.code = code;
+ public RpcRequest(CommonRpcHeader header, Object body) {
this.header = header;
this.body = body;
}
- public int getCode() {
- return code;
- }
-
public CommonRpcHeader getHeader() {
return header;
}