You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2023/03/16 11:55:23 UTC

[rocketmq] 03/06: ReceiptHandleProcessor message renewal strategy optimization #6232

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit fce64608b1335bb208c277ed6320d8d4cf8e2111
Author: loboxu <lo...@tencent.com>
AuthorDate: Wed Mar 8 00:23:18 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization #6232
---
 .../proxy/common/MessageReceiptHandle.java         |  2 +-
 .../processor/ReceiptHandleProcessorTest.java      | 22 +++++++++++++++++-----
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 263d6157d..8fa6583b2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -142,7 +142,7 @@ public class MessageReceiptHandle {
         return this.renewRetryTimes.get();
     }
 
-    public RetryPolicy getRenewStrategyPolicy(){
+    public RetryPolicy getRenewStrategyPolicy() {
         return this.renewStrategyPolicy;
     }
 }
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 1037ea8db..9d450fdf2 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -43,7 +43,13 @@ import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.proxy.common.*;
+import org.apache.rocketmq.proxy.common.ContextVariable;
+import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
@@ -255,10 +261,16 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
             futureList.add(ackResultFuture);
             futureList.add(ackResultFuture);
         }
-        Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
-            return futureList.get(count.getAndIncrement());
-        }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+
+        RetryPolicy retryPolicy = new RenewStrategyPolicy();
+        AtomicInteger times = new AtomicInteger(0);
+        for (int i = 0; i < 6; i++) {
+            Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
+                return futureList.get(count.getAndIncrement());
+            }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                    Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())));
+        }
+
         await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(() -> {
             receiptHandleProcessor.scheduleRenewTask();
             try {