You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/11 08:18:24 UTC
[rocketmq] 02/02: Polish the rpc usage for PullProcessor
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 8d49c16df87e93bdf1e7a25a27716c559f6e34e4
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 11 16:18:08 2021 +0800
Polish the rpc usage for PullProcessor
---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 49 +++++-----------------
.../broker/processor/PullMessageProcessor.java | 24 +++++------
.../remoting/protocol/RemotingCommand.java | 34 +++++++++------
3 files changed, 43 insertions(+), 64 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index c5f8b90..5bb499e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -474,29 +474,17 @@ public class BrokerOuterAPI {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
}
- public RemotingCommand pullMessage(String bname, PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception {
-
+ private String getBrokerAddrByNameOrException(String bname) throws MQBrokerException {
String addr = this.brokerController.getBrokerAddrByName(bname);
if (addr == null) {
- return RemotingCommand.buildErrorResponse(ResponseCode.SYSTEM_ERROR,
- String.format("%s-%d cannot find addr when forward to broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), bname, currBrokerName),
- PullMessageResponseHeader.class);
+ throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr);
}
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- assert response != null;
- return response;
-
+ return addr;
}
public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
- String addr = this.brokerController.getBrokerAddrByName(bname);
- if (addr == null) {
- RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
- rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
- return rpcResponse;
- }
- RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest );
+ String addr = getBrokerAddrByNameOrException(bname);
+ RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
@@ -516,13 +504,8 @@ public class BrokerOuterAPI {
}
public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
- String addr = this.brokerController.getBrokerAddrByName(bname);
- if (addr == null) {
- RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
- rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
- return rpcResponse;
- }
- RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest);
+ String addr = getBrokerAddrByNameOrException(bname);
+ RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
@@ -540,14 +523,9 @@ public class BrokerOuterAPI {
}
public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
- String addr = this.brokerController.getBrokerAddrByName(bname);
- if (addr == null) {
- RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
- rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
- return rpcResponse;
- }
+ String addr = getBrokerAddrByNameOrException(bname);
- RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest);
+ RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
@@ -566,14 +544,9 @@ public class BrokerOuterAPI {
}
public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
- String addr = this.brokerController.getBrokerAddrByName(bname);
- if (addr == null) {
- RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
- rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
- return rpcResponse;
- }
+ String addr = getBrokerAddrByNameOrException(bname);
- RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest);
+ RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 79869b9..3f2a682 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -51,6 +51,7 @@ import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
@@ -65,6 +66,8 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RpcRequest;
+import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -141,27 +144,20 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
}
requestHeader.setPhysical(true);
- RemotingCommand response = this.brokerController.getBrokerOuterAPI().pullMessage(bname, requestHeader, this.brokerController.getBrokerConfig().getForwardTimeout());
- switch (response.getCode()) {
- case ResponseCode.SYSTEM_ERROR:
- return response;
- case ResponseCode.SUCCESS:
- case ResponseCode.PULL_NOT_FOUND:
- case ResponseCode.PULL_RETRY_IMMEDIATELY:
- case ResponseCode.PULL_OFFSET_MOVED:
- break;
- default:
- throw new MQBrokerException(response.getCode(), response.getRemark(), mappingItem.getBname());
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(bname, rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
}
- PullMessageResponseHeader responseHeader =
- (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+
+ PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) rpcResponse.getHeader();
{
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
}
- return response;
+ return RemotingCommand.createCommandForRpcResponse(rpcResponse);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 8fc3f9e..e061180 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -17,21 +17,22 @@
package org.apache.rocketmq.remoting.protocol;
import com.alibaba.fastjson.annotation.JSONField;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RpcRequest;
+import org.apache.rocketmq.remoting.RpcResponse;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.RpcRequest;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
public class RemotingCommand {
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
@@ -87,7 +88,15 @@ public class RemotingCommand {
protected RemotingCommand() {
}
- public static RemotingCommand createRequestCommand(RpcRequest rpcRequest) {
+ public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
+ RemotingCommand cmd = new RemotingCommand();
+ cmd.setCode(code);
+ cmd.customHeader = customHeader;
+ setCmdVersion(cmd);
+ return cmd;
+ }
+
+ public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(rpcRequest.getCode());
cmd.customHeader = rpcRequest.getHeader();
@@ -96,10 +105,11 @@ public class RemotingCommand {
return cmd;
}
- public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
+ public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
RemotingCommand cmd = new RemotingCommand();
- cmd.setCode(code);
- cmd.customHeader = customHeader;
+ cmd.markResponseType();
+ cmd.setCode(rpcResponse.getCode());
+ cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
setCmdVersion(cmd);
return cmd;
}