You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/08 09:28:16 UTC
[rocketmq] 01/02: Try using the new style to handble get min offset
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 8b747f97f99bf40a6b0d6b813fb91b5f1f673c67
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 8 10:58:47 2021 +0800
Try using the new style to handble get min offset
---
.../broker/processor/AdminBrokerProcessor.java | 54 +++++++++++++---------
.../apache/rocketmq/common/rpc/RpcResponse.java | 2 +-
2 files changed, 32 insertions(+), 24 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 270c953..cca96f1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.CompleteFuture;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.BrokerController;
@@ -103,6 +104,8 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rpc.RpcClientUtils;
+import org.apache.rocketmq.common.rpc.RpcException;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
@@ -146,7 +149,10 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
@@ -764,15 +770,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
- private RemotingCommand rewriteRequestForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
+ private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
if (mappingContext.getMappingDetail() == null) {
return null;
}
-
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
if (!mappingContext.isLeader()) {
//this may not
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
+ return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
+ String.format("%s-%d is not leader in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()))));
};
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
@@ -796,38 +802,40 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
}
long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset);
- final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
- final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
+ final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader();
responseHeader.setOffset(offset);
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
- return response;
+ return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null));
} catch (Throwable t) {
log.error("rewriteRequestForStaticTopic failed", t);
- return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t)));
}
}
- private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
- final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
- final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
- final GetMinOffsetRequestHeader requestHeader =
- (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
-
-
+ private CompletableFuture<RpcResponse> handleGetMinOffset(RpcRequest request) {
+ assert request.getCode() == RequestCode.GET_MIN_OFFSET;
+ GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
- RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
-
+ final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader();
long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
-
responseHeader.setOffset(offset);
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
- return response;
+ return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null));
+ }
+
+ private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final GetMinOffsetRequestHeader requestHeader =
+ (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
+ try {
+ CompletableFuture<RpcResponse> responseFuture = handleGetMinOffset(new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null));
+ RpcResponse rpcResponse = responseFuture.get();
+ return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
+ } catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ }
}
private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index 2f61329..5155bd2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -34,7 +34,7 @@ public class RpcResponse {
this.body = body;
}
- RpcResponse(RpcException rpcException) {
+ public RpcResponse(RpcException rpcException) {
this.code = rpcException.getErrorCode();
this.exception = rpcException;
}