You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/02 09:48:53 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Fix the computePullFromWhereWithException
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 74565fd Fix the computePullFromWhereWithException
74565fd is described below
commit 74565fdda0cdb0a2b60774d399960e197fb982d2
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 2 17:48:31 2021 +0800
Fix the computePullFromWhereWithException
---
.../rocketmq/broker/processor/PullMessageProcessor.java | 4 +++-
.../client/impl/consumer/DefaultMQPushConsumerImpl.java | 4 ++++
.../rocketmq/client/impl/consumer/RebalancePushImpl.java | 14 +++++++++++---
3 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index aa6879b..6bac707 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -200,6 +200,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
//actually, we need do nothing, but keep the code structure here
if (code == ResponseCode.PULL_OFFSET_MOVED) {
responseCode = ResponseCode.PULL_OFFSET_MOVED;
+ nextBeginOffset = maxOffset;
} else {
//maybe current broker is the slave
responseCode = code;
@@ -217,11 +218,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
if (requestOffset < minOffset) {
if (code == ResponseCode.PULL_OFFSET_MOVED) {
responseCode = ResponseCode.PULL_OFFSET_MOVED;
+ nextBeginOffset = minOffset;
} else {
//maybe read from slave, but we still set it to moved
responseCode = ResponseCode.PULL_OFFSET_MOVED;
+ nextBeginOffset = minOffset;
}
- nextBeginOffset = minOffset;
} else if (requestOffset >= maxOffset) {
//just move to another item
LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 9d158c6..6365f96 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -72,6 +72,7 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.PopProcessQueueInfo;
@@ -297,6 +298,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
+ if (offset < 0) {
+ throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);
+ }
} catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index d28a046..76803dd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -189,7 +189,8 @@ public class RebalancePushImpl extends RebalanceImpl {
}
}
} else {
- result = -1;
+ throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query consume offset from " +
+ "offset store");
}
break;
}
@@ -198,9 +199,11 @@ public class RebalancePushImpl extends RebalanceImpl {
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
+ //the offset will be fixed by the OFFSET_ILLEGAL process
result = 0L;
} else {
- result = -1;
+ throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query offset from offset " +
+ "store");
}
break;
}
@@ -227,7 +230,8 @@ public class RebalancePushImpl extends RebalanceImpl {
}
}
} else {
- result = -1;
+ throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query offset from offset " +
+ "store");
}
break;
}
@@ -236,6 +240,10 @@ public class RebalancePushImpl extends RebalanceImpl {
break;
}
+ if (result < 0) {
+ throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Found unexpected result " + result);
+ }
+
return result;
}