You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/11/09 12:02:39 UTC

[rocketmq] branch develop updated: [ISSUE #5481] Decrease the repeated consumption probability of expired message (#5483)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c217d8bb8 [ISSUE #5481] Decrease the repeated consumption probability of expired message (#5483)
c217d8bb8 is described below

commit c217d8bb80d933da4bf21c36041bae684072c629
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Nov 9 20:02:20 2022 +0800

    [ISSUE #5481] Decrease the repeated consumption probability of expired message (#5483)
    
    * Decrease the repeated consumption probability of expired message
    
    * Polish code
    
    * Make test method name consistent with that of main source file
    
    Co-authored-by: Li Zhanhui <li...@gmail.com>
---
 .../consumer/ConsumeMessageConcurrentlyService.java |  7 +++++++
 .../rocketmq/client/impl/consumer/ProcessQueue.java | 21 +++++++++++++++++++++
 .../client/impl/consumer/ProcessQueueTest.java      | 13 +++++++++++++
 3 files changed, 41 insertions(+)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 0fbdc5ced..aee699ea2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -284,6 +284,13 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
                 List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());
                 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                     MessageExt msg = consumeRequest.getMsgs().get(i);
+                    // Maybe message is expired and cleaned, just ignore it.
+                    if (!consumeRequest.getProcessQueue().containsMessage(msg)) {
+                        log.info("Message is not found in its process queue; skip send-back-procedure, topic={}, "
+                                + "brokerName={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getBrokerName(),
+                            msg.getQueueId(), msg.getQueueOffset());
+                        continue;
+                    }
                     boolean result = this.sendMessageBack(msg, context);
                     if (!result) {
                         msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 9ffab3cba..0fdec4737 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -334,6 +334,27 @@ public class ProcessQueue {
         return result;
     }
 
+    /**
+     * Return the result that whether current message is exist in the process queue or not.
+     */
+    public boolean containsMessage(MessageExt message) {
+        if (message == null) {
+            // should never reach here.
+            return false;
+        }
+        try {
+            this.treeMapLock.readLock().lockInterruptibly();
+            try {
+                return this.msgTreeMap.containsKey(message.getQueueOffset());
+            } finally {
+                this.treeMapLock.readLock().unlock();
+            }
+        } catch (Throwable t) {
+            log.error("Failed to check message's existence in process queue, message={}", message, t);
+        }
+        return false;
+    }
+
     public boolean hasTempMessage() {
         try {
             this.treeMapLock.readLock().lockInterruptibly();
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
index a31c5fb25..259d6430b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.assertj.core.util.Lists;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
@@ -64,6 +65,18 @@ public class ProcessQueueTest {
         assertThat(pq.getMsgSize().get()).isEqualTo(89 * 123);
     }
 
+    @Test
+    public void testContainsMessage() {
+        ProcessQueue pq = new ProcessQueue();
+        final List<MessageExt> messageList = createMessageList(2);
+        final MessageExt message0 = messageList.get(0);
+        final MessageExt message1 = messageList.get(1);
+
+        pq.putMessage(Lists.list(message0));
+        assertThat(pq.containsMessage(message0)).isTrue();
+        assertThat(pq.containsMessage(message1)).isFalse();
+    }
+
     @Test
     public void testFillProcessQueueInfo() {
         ProcessQueue pq = new ProcessQueue();