You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/02 10:58:18 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the resetZero logic for logic queue

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 5b77793  Polish the resetZero logic for logic queue
5b77793 is described below

commit 5b777932d941b1833a510125377a426cf03604ce
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 2 18:57:52 2021 +0800

    Polish the resetZero logic for logic queue
---
 .../rocketmq/broker/processor/ConsumerManageProcessor.java   | 12 +++++++++---
 .../protocol/header/QueryConsumerOffsetRequestHeader.java    | 10 ++++++++++
 .../common/statictopic/TopicQueueMappingContext.java         |  5 -----
 3 files changed, 19 insertions(+), 8 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index aad9454..0443396 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -141,8 +141,10 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             if (!mappingContext.isLeader()) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
             }
-            if (mappingContext.checkIfAsPhysical()) {
-                //let it go
+            List<LogicQueueMappingItem> mappingItemList = mappingContext.getMappingItemList();
+            if (mappingItemList.size() == 1
+                    &&  mappingItemList.get(0).getLogicOffset() == 0) {
+                //as physical, just let it go
                 requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
                 return null;
             }
@@ -169,6 +171,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                     requestHeader.setBname(mappingItem.getBname());
                     requestHeader.setQueueId(mappingItem.getQueueId());
                     requestHeader.setPhysical(true);
+                    requestHeader.setSetZeroIfNotFound(false);
                     RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
                     RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
                     if (rpcResponse.getException() != null) {
@@ -230,7 +233,10 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             long minOffset =
                 this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
                     requestHeader.getQueueId());
-            if (minOffset <= 0
+            if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) {
+                response.setCode(ResponseCode.QUERY_NOT_FOUND);
+                response.setRemark("Not found, do not set to zero, maybe this group boot first");
+            }else if (minOffset <= 0
                 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
                 requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
                 responseHeader.setOffset(0L);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
index b7714da..ebcbe0d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -32,6 +32,8 @@ public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
     @CFNotNull
     private Integer queueId;
 
+    private Boolean setZeroIfNotFound;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -63,4 +65,12 @@ public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
+
+    public Boolean getSetZeroIfNotFound() {
+        return setZeroIfNotFound;
+    }
+
+    public void setSetZeroIfNotFound(Boolean setZeroIfNotFound) {
+        this.setZeroIfNotFound = setZeroIfNotFound;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
index f705c43..d6a6fd9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
@@ -38,11 +38,6 @@ public class TopicQueueMappingContext  {
 
     }
 
-    public boolean checkIfAsPhysical() {
-        return mappingDetail == null
-                || mappingItemList == null
-                || (mappingItemList.size() == 1 &&  mappingItemList.get(0).getLogicOffset() == 0);
-    }
 
     public boolean isLeader() {
         return leaderItem != null && leaderItem.getBname().equals(mappingDetail.getBname());