You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2023/03/16 11:55:25 UTC

[rocketmq] 05/06: ReceiptHandleProcessor message renewal strategy optimization

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit feb1f3077aef151f2eee9d34980ca47eebf2fc90
Author: loboxu <lo...@tencent.com>
AuthorDate: Thu Mar 9 23:05:22 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization
---
 .../proxy/common/MessageReceiptHandle.java         | 15 +++++----
 .../proxy/processor/ReceiptHandleProcessor.java    |  8 +++--
 .../processor/ReceiptHandleProcessorTest.java      | 37 ++++++++++------------
 3 files changed, 31 insertions(+), 29 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 8fa6583b2..e885cf4c2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -21,7 +21,6 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 
 public class MessageReceiptHandle {
     private final String group;
@@ -33,9 +32,9 @@ public class MessageReceiptHandle {
     private final int reconsumeTimes;
 
     private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
+    private final AtomicInteger renewTimes = new AtomicInteger(0);
     private final long consumeTimestamp;
     private volatile String receiptHandleStr;
-    private final RetryPolicy renewStrategyPolicy;
 
     public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
         long queueOffset, int reconsumeTimes) {
@@ -49,7 +48,6 @@ public class MessageReceiptHandle {
         this.queueOffset = queueOffset;
         this.reconsumeTimes = reconsumeTimes;
         this.consumeTimestamp = receiptHandle.getRetrieveTime();
-        this.renewStrategyPolicy = new RenewStrategyPolicy();
     }
 
     @Override
@@ -134,6 +132,14 @@ public class MessageReceiptHandle {
         return this.renewRetryTimes.incrementAndGet();
     }
 
+    public int incrementRenewTimes() {
+        return this.renewTimes.incrementAndGet();
+    }
+
+    public int getRenewTimes() {
+        return this.renewTimes.get();
+    }
+
     public void resetRenewRetryTimes() {
         this.renewRetryTimes.set(0);
     }
@@ -142,7 +148,4 @@ public class MessageReceiptHandle {
         return this.renewRetryTimes.get();
     }
 
-    public RetryPolicy getRenewStrategyPolicy() {
-        return this.renewStrategyPolicy;
-    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 7b7982bab..357e94249 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
 import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
 import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
@@ -175,10 +176,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                 return CompletableFuture.completedFuture(null);
             }
             if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
-                RetryPolicy renewPolicy = messageReceiptHandle.getRenewStrategyPolicy();
+                RetryPolicy renewPolicy = new RenewStrategyPolicy();
                 CompletableFuture<AckResult> future =
                     messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
-                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewRetryTimes()));
+                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
@@ -191,9 +192,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                     } else if (AckStatus.OK.equals(ackResult.getStatus())) {
                         messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
                         messageReceiptHandle.resetRenewRetryTimes();
+                        messageReceiptHandle.incrementRenewTimes();
                         resFuture.complete(messageReceiptHandle);
                     } else {
-                        log.error("renew response is not ok. result:{}", ackResult, messageReceiptHandle);
+                        log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle);
                         resFuture.complete(null);
                     }
                 });
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 6d77c1471..c0bff981f 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -124,7 +124,8 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        long newInvisibleTime = 2000L;
+        long newInvisibleTime = 18000L;
+
         ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder()
             .startOffset(0L)
             .retrieveTime(System.currentTimeMillis() - newInvisibleTime + config.getRenewAheadTimeMillis() - 5)
@@ -137,20 +138,28 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
             .commitLogOffset(0L)
             .build();
         String newReceiptHandle = newReceiptHandleClass.encode();
+
+        RetryPolicy retryPolicy = new RenewStrategyPolicy();
+        AtomicInteger times = new AtomicInteger(0);
+
         AckResult ackResult = new AckResult();
         ackResult.setStatus(AckStatus.OK);
         ackResult.setExtraInfo(newReceiptHandle);
+
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get()))))
             .thenReturn(CompletableFuture.completedFuture(ackResult));
         receiptHandleProcessor.scheduleRenewTask();
+
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get())));
         receiptHandleProcessor.scheduleRenewTask();
+
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.incrementAndGet())));
+        receiptHandleProcessor.scheduleRenewTask();
     }
 
     @Test
@@ -164,20 +173,11 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
 
         RetryPolicy retryPolicy = new RenewStrategyPolicy();
-        AtomicInteger times = new AtomicInteger(0);
 
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()))))
             .thenReturn(ackResultFuture);
 
-        Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
-                .thenReturn(ackResultFuture);
-
-        Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
-                .thenReturn(ackResultFuture);
-
         await().atMost(Duration.ofSeconds(1)).until(() -> {
             receiptHandleProcessor.scheduleRenewTask();
             try {
@@ -188,12 +188,9 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
             }
         });
 
-        times = new AtomicInteger(0);
-        for (int i = 0; i < config.getMaxRenewRetryTimes(); i++) {
-            Mockito.verify(messagingProcessor, Mockito.times(1))
-                    .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())));
-        }
+        Mockito.verify(messagingProcessor, Mockito.times(3))
+            .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())));
     }
 
     @Test