You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/23 11:00:18 UTC

[rocketmq] branch develop updated: [ISSUE #6163] Fix the issue of infinite retry of order message (#6164)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7cfffe7f4 [ISSUE #6163] Fix the issue of infinite retry of order message (#6164)
7cfffe7f4 is described below

commit 7cfffe7f48e1db9e9db3641e8ba6aed3e465281c
Author: rongtong <ji...@163.com>
AuthorDate: Thu Feb 23 18:59:56 2023 +0800

    [ISSUE #6163] Fix the issue of infinite retry of order message (#6164)
---
 .../rocketmq/broker/processor/SendMessageProcessor.java      | 12 ++++++++++--
 .../client/impl/consumer/ConsumeMessageOrderlyService.java   |  2 +-
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 00b85b8c1..092db4ed5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -184,8 +184,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
             }
             int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
-            // Using '>' instead of '>=' to compatible with the case that reconsumeTimes here are increased by client.
-            if (reconsumeTimes > maxReconsumeTimes) {
+
+            boolean sendRetryMessageToDeadLetterQueueDirectly = false;
+            if (!brokerController.getRebalanceLockManager().isLockAllExpired(groupName)) {
+                LOGGER.info("Group has unexpired lock record, which show it is ordered message, send it to DLQ "
+                        + "right now group={}, topic={}, reconsumeTimes={}, maxReconsumeTimes={}.", groupName,
+                    newTopic, reconsumeTimes, maxReconsumeTimes);
+                sendRetryMessageToDeadLetterQueueDirectly = true;
+            }
+
+            if (reconsumeTimes > maxReconsumeTimes || sendRetryMessageToDeadLetterQueueDirectly) {
                 Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
                     .put(LABEL_CONSUMER_GROUP, requestHeader.getProducerGroup())
                     .put(LABEL_TOPIC, requestHeader.getTopic())
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index f9531a4be..29a846dfd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -388,7 +388,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
             MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
             newMsg.setFlag(msg.getFlag());
             MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
-            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
+            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
             MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
             MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
             newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());