You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/08 09:28:16 UTC

[rocketmq] 01/02: Try using the new style to handble get min offset

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 8b747f97f99bf40a6b0d6b813fb91b5f1f673c67
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 8 10:58:47 2021 +0800

    Try using the new style to handble get min offset
---
 .../broker/processor/AdminBrokerProcessor.java     | 54 +++++++++++++---------
 .../apache/rocketmq/common/rpc/RpcResponse.java    |  2 +-
 2 files changed, 32 insertions(+), 24 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 270c953..cca96f1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.CompleteFuture;
 import org.apache.rocketmq.acl.AccessValidator;
 import org.apache.rocketmq.acl.plain.PlainAccessValidator;
 import org.apache.rocketmq.broker.BrokerController;
@@ -103,6 +104,8 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rpc.RpcClientUtils;
+import org.apache.rocketmq.common.rpc.RpcException;
 import org.apache.rocketmq.common.rpc.RpcRequest;
 import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
@@ -146,7 +149,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
 
 import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
@@ -764,15 +770,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
-    private RemotingCommand rewriteRequestForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext)  {
+    private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext)  {
         if (mappingContext.getMappingDetail() == null) {
             return null;
         }
-
         TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
         if (!mappingContext.isLeader()) {
             //this may not
-            return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
+            return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
+                    String.format("%s-%d is not leader in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()))));
         };
 
         LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
@@ -796,38 +802,40 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             }
             long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset);
 
-            final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
-            final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
+            final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader();
             responseHeader.setOffset(offset);
-            response.setCode(ResponseCode.SUCCESS);
-            response.setRemark(null);
-            return response;
+            return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null));
         } catch (Throwable t) {
             log.error("rewriteRequestForStaticTopic failed", t);
-            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+            return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t)));
         }
     }
 
-    private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
-        RemotingCommand request) throws RemotingCommandException {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
-        final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
-        final GetMinOffsetRequestHeader requestHeader =
-            (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
-
-
+    private CompletableFuture<RpcResponse>  handleGetMinOffset(RpcRequest request) {
+        assert request.getCode() == RequestCode.GET_MIN_OFFSET;
+        GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
         TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
-        RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
         }
-
+        final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader();
         long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
-
         responseHeader.setOffset(offset);
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
-        return response;
+        return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null));
+    }
+
+    private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final GetMinOffsetRequestHeader requestHeader =
+            (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
+        try {
+            CompletableFuture<RpcResponse> responseFuture = handleGetMinOffset(new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null));
+            RpcResponse  rpcResponse = responseFuture.get();
+            return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+        }
     }
 
     private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader requestHeader, TopicQueueMappingContext mappingContext)  {
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index 2f61329..5155bd2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -34,7 +34,7 @@ public class RpcResponse   {
         this.body = body;
     }
 
-    RpcResponse(RpcException rpcException) {
+    public RpcResponse(RpcException rpcException) {
         this.code = rpcException.getErrorCode();
         this.exception = rpcException;
     }