You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2023/03/16 11:55:21 UTC

[rocketmq] 01/06: ReceiptHandleProcessor message renewal strategy optimization #6232

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 667ebbcc142a0584c2f30dc0ae3c135017127529
Author: loboxu <lo...@tencent.com>
AuthorDate: Tue Mar 7 22:14:27 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization #6232
---
 .../proxy/common/MessageReceiptHandle.java         |  7 +++
 .../rocketmq/proxy/common/RenewStrategyPolicy.java | 70 ++++++++++++++++++++++
 .../apache/rocketmq/proxy/config/ProxyConfig.java  | 23 ++++---
 .../grpc/v2/consumer/ReceiveMessageActivity.java   |  2 +-
 .../proxy/processor/ReceiptHandleProcessor.java    |  3 +-
 .../proxy/common/RenewStrategyPolicyTest.java      | 66 ++++++++++++++++++++
 .../processor/ReceiptHandleProcessorTest.java      | 16 ++---
 7 files changed, 168 insertions(+), 19 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 0b3c241d1..263d6157d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 
 public class MessageReceiptHandle {
     private final String group;
@@ -34,6 +35,7 @@ public class MessageReceiptHandle {
     private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
     private final long consumeTimestamp;
     private volatile String receiptHandleStr;
+    private final RetryPolicy renewStrategyPolicy;
 
     public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
         long queueOffset, int reconsumeTimes) {
@@ -47,6 +49,7 @@ public class MessageReceiptHandle {
         this.queueOffset = queueOffset;
         this.reconsumeTimes = reconsumeTimes;
         this.consumeTimestamp = receiptHandle.getRetrieveTime();
+        this.renewStrategyPolicy = new RenewStrategyPolicy();
     }
 
     @Override
@@ -138,4 +141,8 @@ public class MessageReceiptHandle {
     public int getRenewRetryTimes() {
         return this.renewRetryTimes.get();
     }
+
+    public RetryPolicy getRenewStrategyPolicy(){
+        return this.renewStrategyPolicy;
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java
new file mode 100644
index 000000000..ce33619b4
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import com.google.common.base.MoreObjects;
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class RenewStrategyPolicy implements RetryPolicy {
+    // 1m 3m 5m 6m 10m 30m 1h
+    private long[] next = new long[]{
+            TimeUnit.MINUTES.toMillis(1),
+            TimeUnit.MINUTES.toMillis(3),
+            TimeUnit.MINUTES.toMillis(5),
+            TimeUnit.MINUTES.toMillis(10),
+            TimeUnit.MINUTES.toMillis(30),
+            TimeUnit.HOURS.toMillis(1)
+    };
+
+    public RenewStrategyPolicy() {
+    }
+
+    public RenewStrategyPolicy(long[] next) {
+        this.next = next;
+    }
+
+    public long[] getNext() {
+        return next;
+    }
+
+    public void setNext(long[] next) {
+        this.next = next;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("next", next)
+                .toString();
+    }
+
+    @Override
+    public long nextDelayDuration(int renewTimes) {
+        if (renewTimes < 0) {
+            renewTimes = 0;
+        }
+        int index = renewTimes;
+        if (index >= next.length) {
+            index = next.length - 1;
+        }
+        return next[index];
+    }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index c65877a41..dcbf1af0e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -107,10 +107,16 @@ public class ProxyConfig implements ConfigFile {
      */
     private int maxUserPropertySize = 16 * 1024;
     private int userPropertyMaxNum = 128;
+
     /**
      * max message group size, 0 or negative number means no limit for proxy
      */
     private int maxMessageGroupSize = 64;
+
+    /**
+     * When a message pops, the message is invisible by default
+     */
+    private long defaultInvisibleTimeMills = Duration.ofSeconds(60).toMillis();
     private long minInvisibleTimeMillsForRecv = Duration.ofSeconds(10).toMillis();
     private long maxInvisibleTimeMills = Duration.ofHours(12).toMillis();
     private long maxDelayTimeMills = Duration.ofDays(1).toMillis();
@@ -180,7 +186,6 @@ public class ProxyConfig implements ConfigFile {
     private int renewThreadPoolQueueCapacity = 300;
     private long lockTimeoutMsInHandleGroup = TimeUnit.SECONDS.toMillis(3);
     private long renewAheadTimeMillis = TimeUnit.SECONDS.toMillis(10);
-    private long renewSliceTimeMillis = TimeUnit.SECONDS.toMillis(60);
     private long renewMaxTimeMillis = TimeUnit.HOURS.toMillis(3);
     private long renewSchedulePeriodMillis = TimeUnit.SECONDS.toMillis(5);
 
@@ -555,6 +560,14 @@ public class ProxyConfig implements ConfigFile {
         this.minInvisibleTimeMillsForRecv = minInvisibleTimeMillsForRecv;
     }
 
+    public long getDefaultInvisibleTimeMills() {
+        return defaultInvisibleTimeMills;
+    }
+
+    public void setDefaultInvisibleTimeMills(long defaultInvisibleTimeMills) {
+        this.defaultInvisibleTimeMills = defaultInvisibleTimeMills;
+    }
+
     public long getMaxInvisibleTimeMills() {
         return maxInvisibleTimeMills;
     }
@@ -1019,14 +1032,6 @@ public class ProxyConfig implements ConfigFile {
         this.renewAheadTimeMillis = renewAheadTimeMillis;
     }
 
-    public long getRenewSliceTimeMillis() {
-        return renewSliceTimeMillis;
-    }
-
-    public void setRenewSliceTimeMillis(long renewSliceTimeMillis) {
-        this.renewSliceTimeMillis = renewSliceTimeMillis;
-    }
-
     public long getRenewMaxTimeMillis() {
         return renewMaxTimeMillis;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index ddbe07083..9df4101f7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -103,7 +103,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity {
             long actualInvisibleTime = Durations.toMillis(request.getInvisibleDuration());
             ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
             if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
-                actualInvisibleTime = proxyConfig.getRenewSliceTimeMillis();
+                actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills();
             } else {
                 validateInvisibleTime(actualInvisibleTime,
                     ConfigurationManager.getProxyConfig().getMinInvisibleTimeMillsForRecv());
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index bbd507070..b0a4e8414 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -175,9 +175,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                 return CompletableFuture.completedFuture(null);
             }
             if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
+                RetryPolicy renewPolicy = messageReceiptHandle.getRenewStrategyPolicy();
                 CompletableFuture<AckResult> future =
                     messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
-                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
+                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewRetryTimes()));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java
new file mode 100644
index 000000000..54e627274
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+public class RenewStrategyPolicyTest {
+
+    private RetryPolicy retryPolicy;
+    private final AtomicInteger times = new AtomicInteger(0);
+
+    @Before
+    public void before() throws Throwable {
+        this.retryPolicy = new RenewStrategyPolicy();
+    }
+
+    @Test
+    public void testNextDelayDuration() {
+        long value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.MINUTES.toMillis(1));
+
+        value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.MINUTES.toMillis(3));
+
+        value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.MINUTES.toMillis(5));
+
+        value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.MINUTES.toMillis(10));
+
+        value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.MINUTES.toMillis(30));
+
+        value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+        assertEquals(value, TimeUnit.HOURS.toMillis(1));
+    }
+
+
+    @After
+    public void after() {
+    }
+
+}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 33057da6e..355596ba1 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -111,7 +111,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
     }
 
     @Test
@@ -139,16 +139,16 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         ackResult.setStatus(AckStatus.OK);
         ackResult.setExtraInfo(newReceiptHandle);
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
             .thenReturn(CompletableFuture.completedFuture(ackResult));
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
     }
 
     @Test
@@ -161,7 +161,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
         ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
             .thenReturn(ackResultFuture);
 
         await().atMost(Duration.ofSeconds(1)).until(() -> {
@@ -175,7 +175,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         });
         Mockito.verify(messagingProcessor, Mockito.times(config.getMaxRenewRetryTimes()))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
     }
 
     @Test
@@ -187,7 +187,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
         ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
             .thenReturn(ackResultFuture);
 
         await().atMost(Duration.ofSeconds(1)).until(() -> {
@@ -246,7 +246,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
             return futureList.get(count.getAndIncrement());
         }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
         await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(() -> {
             receiptHandleProcessor.scheduleRenewTask();
             try {