You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/16 07:55:12 UTC
[rocketmq] 01/02: Finish the processor
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 86b3ff7b102eeba4f12c837f64c90bb033d90717
Author: dongeforever <do...@apache.org>
AuthorDate: Tue Nov 16 15:40:28 2021 +0800
Finish the processor
---
.../broker/processor/AdminBrokerProcessor.java | 22 ++++++++++++++++------
1 file changed, 16 insertions(+), 6 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 9f525f6..0a8f58c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -646,8 +646,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
requestHeader.setPhysical(true);
requestHeader.setTimestamp(timestamp);
requestHeader.setQueueId(item.getQueueId());
- RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null);
- RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().searchOffset(item.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+ requestHeader.setCode(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP);
+ requestHeader.setBname(item.getBname());
+ RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
@@ -751,8 +753,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
};
try {
- RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
- RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getMinOffset(mappingItem.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+ requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
+ requestHeader.setBname(mappingItem.getBname());
+ requestHeader.setPhysical(true);
+ //TODO check if it is leader
+ RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
@@ -802,8 +808,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
};
try {
- RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
- RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getEarliestMsgStoretime(mappingItem.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+ requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
+ requestHeader.setBname(mappingItem.getBname());
+ requestHeader.setPhysical(true);
+ RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+ //TODO check if it is leader
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}