You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/11/09 12:02:39 UTC
[rocketmq] branch develop updated: [ISSUE #5481] Decrease the repeated consumption probability of expired message (#5483)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c217d8bb8 [ISSUE #5481] Decrease the repeated consumption probability of expired message (#5483)
c217d8bb8 is described below
commit c217d8bb80d933da4bf21c36041bae684072c629
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Nov 9 20:02:20 2022 +0800
[ISSUE #5481] Decrease the repeated consumption probability of expired message (#5483)
* Decrease the repeated consumption probability of expired message
* Polish code
* Make test method name consistent with that of main source file
Co-authored-by: Li Zhanhui <li...@gmail.com>
---
.../consumer/ConsumeMessageConcurrentlyService.java | 7 +++++++
.../rocketmq/client/impl/consumer/ProcessQueue.java | 21 +++++++++++++++++++++
.../client/impl/consumer/ProcessQueueTest.java | 13 +++++++++++++
3 files changed, 41 insertions(+)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 0fbdc5ced..aee699ea2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -284,6 +284,13 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
+ // Maybe message is expired and cleaned, just ignore it.
+ if (!consumeRequest.getProcessQueue().containsMessage(msg)) {
+ log.info("Message is not found in its process queue; skip send-back-procedure, topic={}, "
+ + "brokerName={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getBrokerName(),
+ msg.getQueueId(), msg.getQueueOffset());
+ continue;
+ }
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 9ffab3cba..0fdec4737 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -334,6 +334,27 @@ public class ProcessQueue {
return result;
}
+ /**
+ * Return the result that whether current message is exist in the process queue or not.
+ */
+ public boolean containsMessage(MessageExt message) {
+ if (message == null) {
+ // should never reach here.
+ return false;
+ }
+ try {
+ this.treeMapLock.readLock().lockInterruptibly();
+ try {
+ return this.msgTreeMap.containsKey(message.getQueueOffset());
+ } finally {
+ this.treeMapLock.readLock().unlock();
+ }
+ } catch (Throwable t) {
+ log.error("Failed to check message's existence in process queue, message={}", message, t);
+ }
+ return false;
+ }
+
public boolean hasTempMessage() {
try {
this.treeMapLock.readLock().lockInterruptibly();
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
index a31c5fb25..259d6430b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
@@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.assertj.core.util.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@@ -64,6 +65,18 @@ public class ProcessQueueTest {
assertThat(pq.getMsgSize().get()).isEqualTo(89 * 123);
}
+ @Test
+ public void testContainsMessage() {
+ ProcessQueue pq = new ProcessQueue();
+ final List<MessageExt> messageList = createMessageList(2);
+ final MessageExt message0 = messageList.get(0);
+ final MessageExt message1 = messageList.get(1);
+
+ pq.putMessage(Lists.list(message0));
+ assertThat(pq.containsMessage(message0)).isTrue();
+ assertThat(pq.containsMessage(message1)).isFalse();
+ }
+
@Test
public void testFillProcessQueueInfo() {
ProcessQueue pq = new ProcessQueue();