You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/06 03:29:43 UTC
[rocketmq] branch develop updated: [ISSUE #5847] Add checkBlock for hasMsgFromQueue
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 671977e46 [ISSUE #5847] Add checkBlock for hasMsgFromQueue
671977e46 is described below
commit 671977e4694865c6102cc85ce2e2fd0a8833561e
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Fri Feb 3 16:07:34 2023 +0800
[ISSUE #5847] Add checkBlock for hasMsgFromQueue
---
.../org/apache/rocketmq/broker/processor/NotificationProcessor.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index e10d72328..3b306ca2d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -195,6 +195,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
response.setRemark(errorInfo);
return response;
}
+
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
@@ -263,6 +264,9 @@ public class NotificationProcessor implements NettyRequestProcessor {
}
private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader requestHeader, int queueId) {
+ if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
+ return false;
+ }
String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic();
long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId);
long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset;