You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2023/03/16 11:55:20 UTC

[rocketmq] branch develop updated (6fd1bb9a2 -> 877f146f5)

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


    from 6fd1bb9a2 [ISSUE #6358] Add chinese docs about persistent-unique-broker-id (#6359)
     new 667ebbcc1 ReceiptHandleProcessor message renewal strategy optimization #6232
     new 06a87e650 ReceiptHandleProcessor message renewal strategy optimization #6232
     new fce64608b ReceiptHandleProcessor message renewal strategy optimization #6232
     new 1e3e7638f ReceiptHandleProcessor message renewal strategy optimization #6232
     new feb1f3077 ReceiptHandleProcessor message renewal strategy optimization
     new 877f146f5 ReceiptHandleProcessor message renewal strategy optimization

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../proxy/common/MessageReceiptHandle.java         | 10 ++++
 .../rocketmq/proxy/common/RenewStrategyPolicy.java | 70 ++++++++++++++++++++++
 .../apache/rocketmq/proxy/config/ProxyConfig.java  | 23 ++++---
 .../grpc/v2/consumer/ReceiveMessageActivity.java   |  2 +-
 .../proxy/processor/ReceiptHandleProcessor.java    |  7 ++-
 .../proxy/common/RenewStrategyPolicyTest.java      | 66 ++++++++++++++++++++
 .../processor/ReceiptHandleProcessorTest.java      | 48 +++++++++++----
 7 files changed, 201 insertions(+), 25 deletions(-)
 create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java
 create mode 100644 proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java


[rocketmq] 01/06: ReceiptHandleProcessor message renewal strategy optimization #6232

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 667ebbcc142a0584c2f30dc0ae3c135017127529
Author: loboxu <lo...@tencent.com>
AuthorDate: Tue Mar 7 22:14:27 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization #6232
---
 .../proxy/common/MessageReceiptHandle.java         |  7 +++
 .../rocketmq/proxy/common/RenewStrategyPolicy.java | 70 ++++++++++++++++++++++
 .../apache/rocketmq/proxy/config/ProxyConfig.java  | 23 ++++---
 .../grpc/v2/consumer/ReceiveMessageActivity.java   |  2 +-
 .../proxy/processor/ReceiptHandleProcessor.java    |  3 +-
 .../proxy/common/RenewStrategyPolicyTest.java      | 66 ++++++++++++++++++++
 .../processor/ReceiptHandleProcessorTest.java      | 16 ++---
 7 files changed, 168 insertions(+), 19 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 0b3c241d1..263d6157d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 
 public class MessageReceiptHandle {
     private final String group;
@@ -34,6 +35,7 @@ public class MessageReceiptHandle {
     private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
     private final long consumeTimestamp;
     private volatile String receiptHandleStr;
+    private final RetryPolicy renewStrategyPolicy;
 
     public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
         long queueOffset, int reconsumeTimes) {
@@ -47,6 +49,7 @@ public class MessageReceiptHandle {
         this.queueOffset = queueOffset;
         this.reconsumeTimes = reconsumeTimes;
         this.consumeTimestamp = receiptHandle.getRetrieveTime();
+        this.renewStrategyPolicy = new RenewStrategyPolicy();
     }
 
     @Override
@@ -138,4 +141,8 @@ public class MessageReceiptHandle {
     public int getRenewRetryTimes() {
         return this.renewRetryTimes.get();
     }
+
+    public RetryPolicy getRenewStrategyPolicy(){
+        return this.renewStrategyPolicy;
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java
new file mode 100644
index 000000000..ce33619b4
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import com.google.common.base.MoreObjects;
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class RenewStrategyPolicy implements RetryPolicy {
+    // 1m 3m 5m 6m 10m 30m 1h
+    private long[] next = new long[]{
+            TimeUnit.MINUTES.toMillis(1),
+            TimeUnit.MINUTES.toMillis(3),
+            TimeUnit.MINUTES.toMillis(5),
+            TimeUnit.MINUTES.toMillis(10),
+            TimeUnit.MINUTES.toMillis(30),
+            TimeUnit.HOURS.toMillis(1)
+    };
+
+    public RenewStrategyPolicy() {
+    }
+
+    public RenewStrategyPolicy(long[] next) {
+        this.next = next;
+    }
+
+    public long[] getNext() {
+        return next;
+    }
+
+    public void setNext(long[] next) {
+        this.next = next;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("next", next)
+                .toString();
+    }
+
+    @Override
+    public long nextDelayDuration(int renewTimes) {
+        if (renewTimes < 0) {
+            renewTimes = 0;
+        }
+        int index = renewTimes;
+        if (index >= next.length) {
+            index = next.length - 1;
+        }
+        return next[index];
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index c65877a41..dcbf1af0e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -107,10 +107,16 @@ public class ProxyConfig implements ConfigFile {
      */
     private int maxUserPropertySize = 16 * 1024;
     private int userPropertyMaxNum = 128;
+
     /**
      * max message group size, 0 or negative number means no limit for proxy
      */
     private int maxMessageGroupSize = 64;
+
+    /**
+     * When a message pops, the message is invisible by default
+     */
+    private long defaultInvisibleTimeMills = Duration.ofSeconds(60).toMillis();
     private long minInvisibleTimeMillsForRecv = Duration.ofSeconds(10).toMillis();
     private long maxInvisibleTimeMills = Duration.ofHours(12).toMillis();
     private long maxDelayTimeMills = Duration.ofDays(1).toMillis();
@@ -180,7 +186,6 @@ public class ProxyConfig implements ConfigFile {
     private int renewThreadPoolQueueCapacity = 300;
     private long lockTimeoutMsInHandleGroup = TimeUnit.SECONDS.toMillis(3);
     private long renewAheadTimeMillis = TimeUnit.SECONDS.toMillis(10);
-    private long renewSliceTimeMillis = TimeUnit.SECONDS.toMillis(60);
     private long renewMaxTimeMillis = TimeUnit.HOURS.toMillis(3);
     private long renewSchedulePeriodMillis = TimeUnit.SECONDS.toMillis(5);
 
@@ -555,6 +560,14 @@ public class ProxyConfig implements ConfigFile {
         this.minInvisibleTimeMillsForRecv = minInvisibleTimeMillsForRecv;
     }
 
+    public long getDefaultInvisibleTimeMills() {
+        return defaultInvisibleTimeMills;
+    }
+
+    public void setDefaultInvisibleTimeMills(long defaultInvisibleTimeMills) {
+        this.defaultInvisibleTimeMills = defaultInvisibleTimeMills;
+    }
+
     public long getMaxInvisibleTimeMills() {
         return maxInvisibleTimeMills;
     }
@@ -1019,14 +1032,6 @@ public class ProxyConfig implements ConfigFile {
         this.renewAheadTimeMillis = renewAheadTimeMillis;
     }
 
-    public long getRenewSliceTimeMillis() {
-        return renewSliceTimeMillis;
-    }
-
-    public void setRenewSliceTimeMillis(long renewSliceTimeMillis) {
-        this.renewSliceTimeMillis = renewSliceTimeMillis;
-    }
-
     public long getRenewMaxTimeMillis() {
         return renewMaxTimeMillis;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index ddbe07083..9df4101f7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -103,7 +103,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity {
             long actualInvisibleTime = Durations.toMillis(request.getInvisibleDuration());
             ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
             if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
-                actualInvisibleTime = proxyConfig.getRenewSliceTimeMillis();
+                actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills();
             } else {
                 validateInvisibleTime(actualInvisibleTime,
                     ConfigurationManager.getProxyConfig().getMinInvisibleTimeMillsForRecv());
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index bbd507070..b0a4e8414 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -175,9 +175,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                 return CompletableFuture.completedFuture(null);
             }
             if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
+                RetryPolicy renewPolicy = messageReceiptHandle.getRenewStrategyPolicy();
                 CompletableFuture<AckResult> future =
                     messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
-                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
+                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewRetryTimes()));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java
new file mode 100644
index 000000000..54e627274
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+public class RenewStrategyPolicyTest {
+
+    private RetryPolicy retryPolicy;
+    private final AtomicInteger times = new AtomicInteger(0);
+
+    @Before
+    public void before() throws Throwable {
+        this.retryPolicy = new RenewStrategyPolicy();
+    }
+
+    @Test
+    public void testNextDelayDuration() {
+        long value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.MINUTES.toMillis(1));
+
+        value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.MINUTES.toMillis(3));
+
+        value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.MINUTES.toMillis(5));
+
+        value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.MINUTES.toMillis(10));
+
+        value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.MINUTES.toMillis(30));
+
+        value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.HOURS.toMillis(1));
+    }
+
+
+    @After
+    public void after() {
+    }
+
+}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 33057da6e..355596ba1 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -111,7 +111,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
     }
 
     @Test
@@ -139,16 +139,16 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         ackResult.setStatus(AckStatus.OK);
         ackResult.setExtraInfo(newReceiptHandle);
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
             .thenReturn(CompletableFuture.completedFuture(ackResult));
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
     }
 
     @Test
@@ -161,7 +161,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
         ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
             .thenReturn(ackResultFuture);
 
         await().atMost(Duration.ofSeconds(1)).until(() -> {
@@ -175,7 +175,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         });
         Mockito.verify(messagingProcessor, Mockito.times(config.getMaxRenewRetryTimes()))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
     }
 
     @Test
@@ -187,7 +187,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
         ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
             .thenReturn(ackResultFuture);
 
         await().atMost(Duration.ofSeconds(1)).until(() -> {
@@ -246,7 +246,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
             return futureList.get(count.getAndIncrement());
         }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
         await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(() -> {
             receiptHandleProcessor.scheduleRenewTask();
             try {


[rocketmq] 02/06: ReceiptHandleProcessor message renewal strategy optimization #6232

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 06a87e65041d9e7cd653399468f94ece6c0c2300
Author: loboxu <lo...@tencent.com>
AuthorDate: Tue Mar 7 23:56:29 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization #6232
---
 .../proxy/processor/ReceiptHandleProcessor.java    |  2 +-
 .../processor/ReceiptHandleProcessorTest.java      | 32 +++++++++++++++-------
 2 files changed, 23 insertions(+), 11 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index b0a4e8414..7b7982bab 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -193,7 +193,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                         messageReceiptHandle.resetRenewRetryTimes();
                         resFuture.complete(messageReceiptHandle);
                     } else {
-                        log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle, throwable);
+                        log.error("renew response is not ok. result:{}", ackResult, messageReceiptHandle);
                         resFuture.complete(null);
                     }
                 });
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 355596ba1..1037ea8db 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -43,15 +43,11 @@ import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.proxy.common.ContextVariable;
-import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
-import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.common.ProxyException;
-import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+import org.apache.rocketmq.proxy.common.*;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.junit.Before;
 import org.junit.Test;
@@ -160,10 +156,22 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
 
         CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
         ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
+
+        RetryPolicy retryPolicy = new RenewStrategyPolicy();
+        AtomicInteger times = new AtomicInteger(0);
+
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
             .thenReturn(ackResultFuture);
 
+        Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
+                .thenReturn(ackResultFuture);
+
+        Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
+                .thenReturn(ackResultFuture);
+
         await().atMost(Duration.ofSeconds(1)).until(() -> {
             receiptHandleProcessor.scheduleRenewTask();
             try {
@@ -173,9 +181,13 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
                 return false;
             }
         });
-        Mockito.verify(messagingProcessor, Mockito.times(config.getMaxRenewRetryTimes()))
-            .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+
+        times = new AtomicInteger(0);
+        for (int i = 0; i < config.getMaxRenewRetryTimes(); i++) {
+            Mockito.verify(messagingProcessor, Mockito.times(1))
+                    .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())));
+        }
     }
 
     @Test


[rocketmq] 03/06: ReceiptHandleProcessor message renewal strategy optimization #6232

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit fce64608b1335bb208c277ed6320d8d4cf8e2111
Author: loboxu <lo...@tencent.com>
AuthorDate: Wed Mar 8 00:23:18 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization #6232
---
 .../proxy/common/MessageReceiptHandle.java         |  2 +-
 .../processor/ReceiptHandleProcessorTest.java      | 22 +++++++++++++++++-----
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 263d6157d..8fa6583b2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -142,7 +142,7 @@ public class MessageReceiptHandle {
         return this.renewRetryTimes.get();
     }
 
-    public RetryPolicy getRenewStrategyPolicy(){
+    public RetryPolicy getRenewStrategyPolicy() {
         return this.renewStrategyPolicy;
     }
 }
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 1037ea8db..9d450fdf2 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -43,7 +43,13 @@ import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.proxy.common.*;
+import org.apache.rocketmq.proxy.common.ContextVariable;
+import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
@@ -255,10 +261,16 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
             futureList.add(ackResultFuture);
             futureList.add(ackResultFuture);
         }
-        Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
-            return futureList.get(count.getAndIncrement());
-        }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+
+        RetryPolicy retryPolicy = new RenewStrategyPolicy();
+        AtomicInteger times = new AtomicInteger(0);
+        for (int i = 0; i < 6; i++) {
+            Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
+                return futureList.get(count.getAndIncrement());
+            }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                    Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())));
+        }
+
         await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(() -> {
             receiptHandleProcessor.scheduleRenewTask();
             try {


[rocketmq] 04/06: ReceiptHandleProcessor message renewal strategy optimization #6232

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 1e3e7638f3ac9577848ac467a2deb8c5768cb48e
Author: loboxu <lo...@tencent.com>
AuthorDate: Wed Mar 8 07:27:25 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization #6232
---
 .../org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java  | 1 +
 1 file changed, 1 insertion(+)

diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 9d450fdf2..6d77c1471 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -280,6 +280,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
                 return false;
             }
         });
+
         assertEquals(6, count.get());
     }
 


[rocketmq] 05/06: ReceiptHandleProcessor message renewal strategy optimization

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit feb1f3077aef151f2eee9d34980ca47eebf2fc90
Author: loboxu <lo...@tencent.com>
AuthorDate: Thu Mar 9 23:05:22 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization
---
 .../proxy/common/MessageReceiptHandle.java         | 15 +++++----
 .../proxy/processor/ReceiptHandleProcessor.java    |  8 +++--
 .../processor/ReceiptHandleProcessorTest.java      | 37 ++++++++++------------
 3 files changed, 31 insertions(+), 29 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 8fa6583b2..e885cf4c2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -21,7 +21,6 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 
 public class MessageReceiptHandle {
     private final String group;
@@ -33,9 +32,9 @@ public class MessageReceiptHandle {
     private final int reconsumeTimes;
 
     private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
+    private final AtomicInteger renewTimes = new AtomicInteger(0);
     private final long consumeTimestamp;
     private volatile String receiptHandleStr;
-    private final RetryPolicy renewStrategyPolicy;
 
     public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
         long queueOffset, int reconsumeTimes) {
@@ -49,7 +48,6 @@ public class MessageReceiptHandle {
         this.queueOffset = queueOffset;
         this.reconsumeTimes = reconsumeTimes;
         this.consumeTimestamp = receiptHandle.getRetrieveTime();
-        this.renewStrategyPolicy = new RenewStrategyPolicy();
     }
 
     @Override
@@ -134,6 +132,14 @@ public class MessageReceiptHandle {
         return this.renewRetryTimes.incrementAndGet();
     }
 
+    public int incrementRenewTimes() {
+        return this.renewTimes.incrementAndGet();
+    }
+
+    public int getRenewTimes() {
+        return this.renewTimes.get();
+    }
+
     public void resetRenewRetryTimes() {
         this.renewRetryTimes.set(0);
     }
@@ -142,7 +148,4 @@ public class MessageReceiptHandle {
         return this.renewRetryTimes.get();
     }
 
-    public RetryPolicy getRenewStrategyPolicy() {
-        return this.renewStrategyPolicy;
-    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 7b7982bab..357e94249 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
 import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
 import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
@@ -175,10 +176,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                 return CompletableFuture.completedFuture(null);
             }
             if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
-                RetryPolicy renewPolicy = messageReceiptHandle.getRenewStrategyPolicy();
+                RetryPolicy renewPolicy = new RenewStrategyPolicy();
                 CompletableFuture<AckResult> future =
                     messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
-                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewRetryTimes()));
+                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
@@ -191,9 +192,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                     } else if (AckStatus.OK.equals(ackResult.getStatus())) {
                         messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
                         messageReceiptHandle.resetRenewRetryTimes();
+                        messageReceiptHandle.incrementRenewTimes();
                         resFuture.complete(messageReceiptHandle);
                     } else {
-                        log.error("renew response is not ok. result:{}", ackResult, messageReceiptHandle);
+                        log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle);
                         resFuture.complete(null);
                     }
                 });
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 6d77c1471..c0bff981f 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -124,7 +124,8 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        long newInvisibleTime = 2000L;
+        long newInvisibleTime = 18000L;
+
         ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder()
             .startOffset(0L)
             .retrieveTime(System.currentTimeMillis() - newInvisibleTime + config.getRenewAheadTimeMillis() - 5)
@@ -137,20 +138,28 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
             .commitLogOffset(0L)
             .build();
         String newReceiptHandle = newReceiptHandleClass.encode();
+
+        RetryPolicy retryPolicy = new RenewStrategyPolicy();
+        AtomicInteger times = new AtomicInteger(0);
+
         AckResult ackResult = new AckResult();
         ackResult.setStatus(AckStatus.OK);
         ackResult.setExtraInfo(newReceiptHandle);
+
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get()))))
             .thenReturn(CompletableFuture.completedFuture(ackResult));
         receiptHandleProcessor.scheduleRenewTask();
+
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get())));
         receiptHandleProcessor.scheduleRenewTask();
+
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.incrementAndGet())));
+        receiptHandleProcessor.scheduleRenewTask();
     }
 
     @Test
@@ -164,20 +173,11 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
 
         RetryPolicy retryPolicy = new RenewStrategyPolicy();
-        AtomicInteger times = new AtomicInteger(0);
 
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()))))
             .thenReturn(ackResultFuture);
 
-        Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
-                .thenReturn(ackResultFuture);
-
-        Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
-                .thenReturn(ackResultFuture);
-
         await().atMost(Duration.ofSeconds(1)).until(() -> {
             receiptHandleProcessor.scheduleRenewTask();
             try {
@@ -188,12 +188,9 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
             }
         });
 
-        times = new AtomicInteger(0);
-        for (int i = 0; i < config.getMaxRenewRetryTimes(); i++) {
-            Mockito.verify(messagingProcessor, Mockito.times(1))
-                    .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())));
-        }
+        Mockito.verify(messagingProcessor, Mockito.times(3))
+            .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())));
     }
 
     @Test


[rocketmq] 06/06: ReceiptHandleProcessor message renewal strategy optimization

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 877f146f56b9ce58e267e434dbf0db68ca3bfc56
Author: loboxu <lo...@tencent.com>
AuthorDate: Mon Mar 13 20:06:21 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization
---
 .../org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 357e94249..133097266 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -64,6 +64,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
         Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
     protected ThreadPoolExecutor renewalWorkerService;
     protected final MessagingProcessor messagingProcessor;
+    protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
 
     public ReceiptHandleProcessor(MessagingProcessor messagingProcessor) {
         this.messagingProcessor = messagingProcessor;
@@ -176,10 +177,9 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                 return CompletableFuture.completedFuture(null);
             }
             if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
-                RetryPolicy renewPolicy = new RenewStrategyPolicy();
                 CompletableFuture<AckResult> future =
                     messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
-                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()));
+                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when renew. handle:{}", messageReceiptHandle, throwable);