You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/02 09:48:53 UTC

[rocketmq] branch 5.0.0-alpha-static-topic updated: Fix the computePullFromWhereWithException

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
     new 74565fd  Fix the computePullFromWhereWithException
74565fd is described below

commit 74565fdda0cdb0a2b60774d399960e197fb982d2
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 2 17:48:31 2021 +0800

    Fix the computePullFromWhereWithException
---
 .../rocketmq/broker/processor/PullMessageProcessor.java    |  4 +++-
 .../client/impl/consumer/DefaultMQPushConsumerImpl.java    |  4 ++++
 .../rocketmq/client/impl/consumer/RebalancePushImpl.java   | 14 +++++++++++---
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index aa6879b..6bac707 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -200,6 +200,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                         //actually, we need do nothing, but keep the code structure here
                         if (code == ResponseCode.PULL_OFFSET_MOVED) {
                             responseCode = ResponseCode.PULL_OFFSET_MOVED;
+                            nextBeginOffset = maxOffset;
                         } else {
                             //maybe current broker is the slave
                             responseCode = code;
@@ -217,11 +218,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
                     if (requestOffset < minOffset) {
                         if (code == ResponseCode.PULL_OFFSET_MOVED) {
                             responseCode = ResponseCode.PULL_OFFSET_MOVED;
+                            nextBeginOffset = minOffset;
                         } else {
                             //maybe read from slave, but we still set it to moved
                             responseCode = ResponseCode.PULL_OFFSET_MOVED;
+                            nextBeginOffset = minOffset;
                         }
-                        nextBeginOffset = minOffset;
                     } else if (requestOffset >= maxOffset) {
                         //just move to another item
                         LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 9d158c6..6365f96 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -72,6 +72,7 @@ import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.PopProcessQueueInfo;
@@ -297,6 +298,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     long offset = -1L;
                     try {
                         offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
+                        if (offset < 0) {
+                            throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);
+                        }
                     } catch (Exception e) {
                         this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                         log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index d28a046..76803dd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -189,7 +189,8 @@ public class RebalancePushImpl extends RebalanceImpl {
                         }
                     }
                 } else {
-                    result = -1;
+                    throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query consume offset from " +
+                            "offset store");
                 }
                 break;
             }
@@ -198,9 +199,11 @@ public class RebalancePushImpl extends RebalanceImpl {
                 if (lastOffset >= 0) {
                     result = lastOffset;
                 } else if (-1 == lastOffset) {
+                    //the offset will be fixed by the OFFSET_ILLEGAL process
                     result = 0L;
                 } else {
-                    result = -1;
+                    throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query offset from offset " +
+                            "store");
                 }
                 break;
             }
@@ -227,7 +230,8 @@ public class RebalancePushImpl extends RebalanceImpl {
                         }
                     }
                 } else {
-                    result = -1;
+                    throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query offset from offset " +
+                            "store");
                 }
                 break;
             }
@@ -236,6 +240,10 @@ public class RebalancePushImpl extends RebalanceImpl {
                 break;
         }
 
+        if (result < 0) {
+            throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Found unexpected result " + result);
+        }
+
         return result;
     }