You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/03 07:34:56 UTC

[rocketmq] 01/03: Fix the max offset logic for logicOffset = -1

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 4f96f72c1c24b557ec5998a6985a353f64ec5f6c
Author: dongeforever <do...@apache.org>
AuthorDate: Fri Dec 3 15:12:11 2021 +0800

    Fix the max offset logic for logicOffset = -1
---
 .../broker/processor/AdminBrokerProcessor.java     | 53 ++++++++++++++++------
 .../broker/processor/PullMessageProcessor.java     |  6 +--
 .../broker/processor/SendMessageProcessor.java     |  2 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  | 25 ++++++++++
 .../common/statictopic/LogicQueueMappingItem.java  | 19 ++++----
 .../common/statictopic/TopicQueueMappingUtils.java |  3 --
 .../apache/rocketmq/tools/admin/MQAdminUtils.java  |  3 +-
 7 files changed, 78 insertions(+), 33 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 33d3025..a286818 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -633,12 +633,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             long offset = -1;
             for (int i = 0; i < mappingItems.size(); i++) {
                 LogicQueueMappingItem item = mappingItems.get(i);
+                if (!item.checkIfLogicoffsetDecided()) {
+                    continue;
+                }
                 if (mappingDetail.getBname().equals(item.getBname())) {
-                    //means the leader
-                    assert i ==  mappingItems.size() - 1;
                     offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
                     if (offset > 0) {
-                        offset = item.computeStaticQueueOffsetUpToEnd(offset);
+                        offset = item.computeStaticQueueOffsetStrictly(offset);
+                        break;
                     }
                 } else {
                     requestHeader.setPhysical(true);
@@ -650,12 +652,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                     if (rpcResponse.getException() != null) {
                         throw rpcResponse.getException();
                     }
-                    SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader)rpcResponse.getHeader();
+                    SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader) rpcResponse.getHeader();
                     if (offsetResponseHeader.getOffset() < 0
                             || (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) {
                         continue;
                     } else {
-                        offset = item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
+                        offset = item.computeStaticQueueOffsetStrictly(offsetResponseHeader.getOffset());
                     }
 
                 }
@@ -705,17 +707,38 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         if (!mappingContext.isLeader()) {
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
         }
-        long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId());
 
-        offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset);
+        try {
+            LogicQueueMappingItem maxItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), Long.MAX_VALUE, true);
+            assert maxItem != null;
+            assert maxItem.getLogicOffset() >= 0;
+            requestHeader.setBname(maxItem.getBname());
+            requestHeader.setPhysical(true);
+            requestHeader.setQueueId(mappingItem.getQueueId());
 
+            long maxPhysicalOffset = Long.MAX_VALUE;
+            if (maxItem.getBname().equals(mappingDetail.getBname())) {
+                //current broker
+                maxPhysicalOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId());
+            } else {
+                RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MAX_OFFSET, requestHeader, null);
+                RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+                if (rpcResponse.getException() != null) {
+                    throw rpcResponse.getException();
+                }
+                GetMaxOffsetResponseHeader offsetResponseHeader = (GetMaxOffsetResponseHeader) rpcResponse.getHeader();
+                maxPhysicalOffset = offsetResponseHeader.getOffset();
+            }
 
-        final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
-        final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
-        responseHeader.setOffset(offset);
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
-        return response;
+            final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
+            final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
+            responseHeader.setOffset(maxItem.computeStaticQueueOffsetStrictly(maxPhysicalOffset));
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+        }
     }
 
     private RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
@@ -770,7 +793,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader();
                 physicalOffset = offsetResponseHeader.getOffset();
             }
-            long offset = mappingItem.computeStaticQueueOffsetUpToEnd(physicalOffset);
+            long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset);
 
             final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
             final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
@@ -820,7 +843,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         try {
             requestHeader.setBname(mappingItem.getBname());
             requestHeader.setPhysical(true);
-            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
+            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
             //TODO check if it is in current broker
             RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
             if (rpcResponse.getException() != null) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 6bac707..9b8135e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -274,11 +274,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                     && nextBeginOffset >= currentItem.getEndOffset()) {
                 nextBeginOffset = currentItem.getEndOffset();
             }
-            responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
+            responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetStrictly(nextBeginOffset));
             //handle min offset
-            responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetUpToEnd(Math.max(currentItem.getStartOffset(), minOffset)));
+            responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetStrictly(Math.max(currentItem.getStartOffset(), minOffset)));
             //handle max offset
-            responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetUpToEnd(maxOffset),
+            responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetStrictly(maxOffset),
                     TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
             //set the offsetDelta
             responseHeader.setOffsetDelta(currentItem.computeOffsetDelta());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index c229ce3..bdcba39 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
             }
             //no need to care the broker name
-            long staticLogicOffset = mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset());
+            long staticLogicOffset = mappingItem.computeStaticQueueOffsetLoosely(responseHeader.getQueueOffset());
             if (staticLogicOffset < 0) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
             }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 713bbf9..47ffcc2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -7,6 +7,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
@@ -70,6 +71,9 @@ public class RpcClientImpl implements RpcClient {
                 case RequestCode.GET_MIN_OFFSET:
                     rpcResponsePromise = handleGetMinOffset(addr, request, timeoutMs);
                     break;
+                case RequestCode.GET_MAX_OFFSET:
+                    rpcResponsePromise = handleGetMaxOffset(addr, request, timeoutMs);
+                    break;
                 case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
                     rpcResponsePromise = handleSearchOffset(addr, request, timeoutMs);
                     break;
@@ -226,6 +230,27 @@ public class RpcClientImpl implements RpcClient {
         return rpcResponsePromise;
     }
 
+    public Promise<RpcResponse> handleGetMaxOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+        RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GetMaxOffsetResponseHeader responseHeader =
+                        (GetMaxOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
+                rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+                break;
+            }
+            default:{
+                rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
+            }
+        }
+        return rpcResponsePromise;
+    }
+
     public Promise<RpcResponse> handleGetEarliestMsgStoretime(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
         final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 9f79e9d..76e7406 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -31,7 +31,9 @@ public class LogicQueueMappingItem extends RemotingSerializable {
         this.timeOfEnd = timeOfEnd;
     }
 
-    public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) {
+
+    //should only be user in sendMessage and getMinOffset
+    public long computeStaticQueueOffsetLoosely(long physicalQueueOffset) {
         //consider the newly mapped item
         if (logicOffset < 0) {
             return -1;
@@ -46,10 +48,9 @@ public class LogicQueueMappingItem extends RemotingSerializable {
         return  logicOffset + (physicalQueueOffset - startOffset);
     }
 
-    public long computeStaticQueueOffset(long physicalQueueOffset) {
-        if (logicOffset < 0) {
-            return logicOffset;
-        }
+    public long computeStaticQueueOffsetStrictly(long physicalQueueOffset) {
+        assert logicOffset >= 0;
+
         if (physicalQueueOffset < startOffset) {
             return logicOffset;
         }
@@ -67,15 +68,15 @@ public class LogicQueueMappingItem extends RemotingSerializable {
             return logicOffset;
         }
     }
-    public boolean checkIfShouldDeleted() {
-        return endOffset == startOffset;
-    }
-
     public boolean checkIfEndOffsetDecided() {
         //if the endOffset == startOffset, then the item should be deleted
         return endOffset > startOffset;
     }
 
+    public boolean checkIfLogicoffsetDecided() {
+        return logicOffset >= 0;
+    }
+
     public long computeOffsetDelta() {
         return logicOffset - startOffset;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index e56d585..8aa1574 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -632,9 +632,6 @@ public class TopicQueueMappingUtils {
             if (ignoreNegative && item.getLogicOffset() < 0) {
                 continue;
             }
-            if (!item.checkIfShouldDeleted()) {
-                return mappingItems.get(i);
-            }
         }
         return null;
     }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index 288bac4..eed6b76 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -16,7 +16,6 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
-import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -165,7 +164,7 @@ public class MQAdminUtils {
                 if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
                     throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
                 }
-                newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize));
+                newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffsetStrictly(topicOffset.getMaxOffset()), blockSeqSize));
                 TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
                 //fresh the new leader
                 TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);