You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/23 11:00:18 UTC
[rocketmq] branch develop updated: [ISSUE #6163] Fix the issue of infinite retry of order message (#6164)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7cfffe7f4 [ISSUE #6163] Fix the issue of infinite retry of order message (#6164)
7cfffe7f4 is described below
commit 7cfffe7f48e1db9e9db3641e8ba6aed3e465281c
Author: rongtong <ji...@163.com>
AuthorDate: Thu Feb 23 18:59:56 2023 +0800
[ISSUE #6163] Fix the issue of infinite retry of order message (#6164)
---
.../rocketmq/broker/processor/SendMessageProcessor.java | 12 ++++++++++--
.../client/impl/consumer/ConsumeMessageOrderlyService.java | 2 +-
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 00b85b8c1..092db4ed5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -184,8 +184,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
- // Using '>' instead of '>=' to compatible with the case that reconsumeTimes here are increased by client.
- if (reconsumeTimes > maxReconsumeTimes) {
+
+ boolean sendRetryMessageToDeadLetterQueueDirectly = false;
+ if (!brokerController.getRebalanceLockManager().isLockAllExpired(groupName)) {
+ LOGGER.info("Group has unexpired lock record, which show it is ordered message, send it to DLQ "
+ + "right now group={}, topic={}, reconsumeTimes={}, maxReconsumeTimes={}.", groupName,
+ newTopic, reconsumeTimes, maxReconsumeTimes);
+ sendRetryMessageToDeadLetterQueueDirectly = true;
+ }
+
+ if (reconsumeTimes > maxReconsumeTimes || sendRetryMessageToDeadLetterQueueDirectly) {
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_CONSUMER_GROUP, requestHeader.getProducerGroup())
.put(LABEL_TOPIC, requestHeader.getTopic())
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index f9531a4be..29a846dfd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -388,7 +388,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
- MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
+ MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());