You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/16 07:32:53 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Correct the invoke for PullMessageProcessor

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new da12c9e  Correct the invoke for PullMessageProcessor
da12c9e is described below

commit da12c9ed6056fc4515b99e1f86d4ac62797463a7
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 16 15:32:43 2021 +0800

    Correct the invoke for PullMessageProcessor
---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java    | 19 +++++++++++++++++--
 .../broker/processor/PullMessageProcessor.java        |  6 ++++--
 .../apache/rocketmq/common/rpc/CommonRpcHeader.java   | 10 ++++++++++
 .../apache/rocketmq/common/rpc/RequestBuilder.java    |  2 ++
 .../org/apache/rocketmq/common/rpc/RpcRequest.java    |  8 +-------
 5 files changed, 34 insertions(+), 11 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index cd8a179..384eef0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -54,6 +54,9 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestH
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.rpc.RpcClient;
+import org.apache.rocketmq.common.rpc.RpcClientImpl;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.InvokeCallback;
@@ -81,15 +84,21 @@ public class BrokerOuterAPI {
     private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
         new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
 
+    private ClientMetadata clientMetadata;
+    private RpcClient rpcClient;
+
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, final BrokerController brokerController) {
-        this(nettyClientConfig, null, brokerController);
+
+        this(nettyClientConfig, null, brokerController, new ClientMetadata());
     }
 
-    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook, final BrokerController brokerController) {
+    private BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook, final BrokerController brokerController, ClientMetadata clientMetadata) {
         this.remotingClient = new NettyRemotingClient(nettyClientConfig);
+        this.clientMetadata = clientMetadata;
         this.remotingClient.registerRPCHook(rpcHook);
         this.brokerController = brokerController;
         this.currBrokerName =  brokerController.getBrokerConfig().getBrokerName();
+        this.rpcClient = new RpcClientImpl(this.clientMetadata, this.remotingClient);
     }
 
     public void start() {
@@ -468,5 +477,11 @@ public class BrokerOuterAPI {
     }
 
 
+    public ClientMetadata getClientMetadata() {
+        return clientMetadata;
+    }
 
+    public RpcClient getRpcClient() {
+        return rpcClient;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index d8cd179..0cd4668 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -144,8 +144,10 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
             }
 
             requestHeader.setPhysical(true);
-            RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null);
-            RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(bname, rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+            requestHeader.setBname(bname);
+            requestHeader.setCode(RequestCode.PULL_MESSAGE);
+            RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+            RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
             if (rpcResponse.getException() != null) {
                 throw rpcResponse.getException();
             }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
index bf58a57..df62d52 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java
@@ -24,6 +24,8 @@ public abstract class CommonRpcHeader implements CommandCustomHeader {
     //if the data has been namespaced
     protected Boolean namespaced;
 
+    protected int code;
+
     //the abstract remote addr name, usually the physical broker name
     protected String bname;
 
@@ -60,4 +62,12 @@ public abstract class CommonRpcHeader implements CommandCustomHeader {
     public void setNamespaced(Boolean namespaced) {
         this.namespaced = namespaced;
     }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index aa69b17..7655871 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -25,6 +25,7 @@ public class RequestBuilder {
         }
         try {
             CommonRpcHeader requestHeader = (CommonRpcHeader) requestHeaderClass.newInstance();
+            requestHeader.setCode(requestCode);
             requestHeader.setOneway(oneway);
             requestHeader.setBname(destBrokerName);
             return requestHeader;
@@ -52,6 +53,7 @@ public class RequestBuilder {
         }
         try {
             TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance();
+            requestHeader.setCode(requestCode);
             requestHeader.setOneway(oneway);
             requestHeader.setBname(destBrokerName);
             requestHeader.setTopic(topic);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
index e1cf010..06d5c79 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
@@ -17,20 +17,14 @@
 package org.apache.rocketmq.common.rpc;
 
 public class RpcRequest {
-    private int code;
     private CommonRpcHeader header;
     private Object body;
 
-    public RpcRequest(int code, CommonRpcHeader header, Object body) {
-        this.code = code;
+    public RpcRequest(CommonRpcHeader header, Object body) {
         this.header = header;
         this.body = body;
     }
 
-    public int getCode() {
-        return code;
-    }
-
     public CommonRpcHeader getHeader() {
         return header;
     }