You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2023/03/16 11:55:23 UTC
[rocketmq] 03/06: ReceiptHandleProcessor message renewal strategy optimization #6232
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit fce64608b1335bb208c277ed6320d8d4cf8e2111
Author: loboxu <lo...@tencent.com>
AuthorDate: Wed Mar 8 00:23:18 2023 +0800
ReceiptHandleProcessor message renewal strategy optimization #6232
---
.../proxy/common/MessageReceiptHandle.java | 2 +-
.../processor/ReceiptHandleProcessorTest.java | 22 +++++++++++++++++-----
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 263d6157d..8fa6583b2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -142,7 +142,7 @@ public class MessageReceiptHandle {
return this.renewRetryTimes.get();
}
- public RetryPolicy getRenewStrategyPolicy(){
+ public RetryPolicy getRenewStrategyPolicy() {
return this.renewStrategyPolicy;
}
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 1037ea8db..9d450fdf2 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -43,7 +43,13 @@ import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.proxy.common.*;
+import org.apache.rocketmq.proxy.common.ContextVariable;
+import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
@@ -255,10 +261,16 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
futureList.add(ackResultFuture);
futureList.add(ackResultFuture);
}
- Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
- return futureList.get(count.getAndIncrement());
- }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+
+ RetryPolicy retryPolicy = new RenewStrategyPolicy();
+ AtomicInteger times = new AtomicInteger(0);
+ for (int i = 0; i < 6; i++) {
+ Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
+ return futureList.get(count.getAndIncrement());
+ }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())));
+ }
+
await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(() -> {
receiptHandleProcessor.scheduleRenewTask();
try {