You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/11 08:18:24 UTC

[rocketmq] 02/02: Polish the rpc usage for PullProcessor

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 8d49c16df87e93bdf1e7a25a27716c559f6e34e4
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 11 16:18:08 2021 +0800

    Polish the rpc usage for PullProcessor
---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 49 +++++-----------------
 .../broker/processor/PullMessageProcessor.java     | 24 +++++------
 .../remoting/protocol/RemotingCommand.java         | 34 +++++++++------
 3 files changed, 43 insertions(+), 64 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index c5f8b90..5bb499e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -474,29 +474,17 @@ public class BrokerOuterAPI {
         this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
     }
 
-    public RemotingCommand pullMessage(String bname, PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception {
-
+    private String getBrokerAddrByNameOrException(String bname) throws MQBrokerException {
         String addr = this.brokerController.getBrokerAddrByName(bname);
         if (addr == null) {
-            return RemotingCommand.buildErrorResponse(ResponseCode.SYSTEM_ERROR,
-                    String.format("%s-%d cannot find addr when forward to broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), bname, currBrokerName),
-                    PullMessageResponseHeader.class);
+            throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr);
         }
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
-        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
-        assert response != null;
-        return response;
-
+        return addr;
     }
 
     public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = this.brokerController.getBrokerAddrByName(bname);
-        if (addr == null) {
-            RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
-            rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
-            return rpcResponse;
-        }
-        RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest );
+        String addr = getBrokerAddrByNameOrException(bname);
+        RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
         RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
         assert responseCommand != null;
 
@@ -516,13 +504,8 @@ public class BrokerOuterAPI {
     }
 
     public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = this.brokerController.getBrokerAddrByName(bname);
-        if (addr == null) {
-            RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
-            rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
-            return rpcResponse;
-        }
-        RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest);
+        String addr = getBrokerAddrByNameOrException(bname);
+        RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
         RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
         assert responseCommand != null;
         switch (responseCommand.getCode()) {
@@ -540,14 +523,9 @@ public class BrokerOuterAPI {
     }
 
     public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = this.brokerController.getBrokerAddrByName(bname);
-        if (addr == null) {
-            RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
-            rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
-            return rpcResponse;
-        }
+        String addr = getBrokerAddrByNameOrException(bname);
 
-        RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest);
+        RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
 
         RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
         assert responseCommand != null;
@@ -566,14 +544,9 @@ public class BrokerOuterAPI {
     }
 
     public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
-        String addr = this.brokerController.getBrokerAddrByName(bname);
-        if (addr == null) {
-            RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
-            rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
-            return rpcResponse;
-        }
+        String addr = getBrokerAddrByNameOrException(bname);
 
-        RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest);
+        RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
 
         RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
         assert responseCommand != null;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 79869b9..3f2a682 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -51,6 +51,7 @@ import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
@@ -65,6 +66,8 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RpcRequest;
+import org.apache.rocketmq.remoting.RpcResponse;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -141,27 +144,20 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
             }
 
             requestHeader.setPhysical(true);
-            RemotingCommand response = this.brokerController.getBrokerOuterAPI().pullMessage(bname, requestHeader, this.brokerController.getBrokerConfig().getForwardTimeout());
-            switch (response.getCode()) {
-                case ResponseCode.SYSTEM_ERROR:
-                    return response;
-                case ResponseCode.SUCCESS:
-                case ResponseCode.PULL_NOT_FOUND:
-               case ResponseCode.PULL_RETRY_IMMEDIATELY:
-                case ResponseCode.PULL_OFFSET_MOVED:
-                    break;
-                default:
-                    throw new MQBrokerException(response.getCode(), response.getRemark(), mappingItem.getBname());
+            RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null);
+            RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(bname, rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+            if (rpcResponse.getException() != null) {
+                throw rpcResponse.getException();
             }
-            PullMessageResponseHeader responseHeader =
-                    (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+
+            PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) rpcResponse.getHeader();
             {
                 RemotingCommand rewriteResult =  rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
                 if (rewriteResult != null) {
                     return rewriteResult;
                 }
             }
-            return response;
+            return RemotingCommand.createCommandForRpcResponse(rpcResponse);
         } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 8fc3f9e..e061180 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -17,21 +17,22 @@
 package org.apache.rocketmq.remoting.protocol;
 
 import com.alibaba.fastjson.annotation.JSONField;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RpcRequest;
+import org.apache.rocketmq.remoting.RpcResponse;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.RpcRequest;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 public class RemotingCommand {
     public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
@@ -87,7 +88,15 @@ public class RemotingCommand {
     protected RemotingCommand() {
     }
 
-    public static RemotingCommand createRequestCommand(RpcRequest rpcRequest) {
+    public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
+        RemotingCommand cmd = new RemotingCommand();
+        cmd.setCode(code);
+        cmd.customHeader = customHeader;
+        setCmdVersion(cmd);
+        return cmd;
+    }
+
+    public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
         RemotingCommand cmd = new RemotingCommand();
         cmd.setCode(rpcRequest.getCode());
         cmd.customHeader = rpcRequest.getHeader();
@@ -96,10 +105,11 @@ public class RemotingCommand {
         return cmd;
     }
 
-    public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
+    public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
         RemotingCommand cmd = new RemotingCommand();
-        cmd.setCode(code);
-        cmd.customHeader = customHeader;
+        cmd.markResponseType();
+        cmd.setCode(rpcResponse.getCode());
+        cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
         setCmdVersion(cmd);
         return cmd;
     }