You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2023/03/16 11:55:21 UTC
[rocketmq] 01/06: ReceiptHandleProcessor message renewal strategy optimization #6232
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 667ebbcc142a0584c2f30dc0ae3c135017127529
Author: loboxu <lo...@tencent.com>
AuthorDate: Tue Mar 7 22:14:27 2023 +0800
ReceiptHandleProcessor message renewal strategy optimization #6232
---
.../proxy/common/MessageReceiptHandle.java | 7 +++
.../rocketmq/proxy/common/RenewStrategyPolicy.java | 70 ++++++++++++++++++++++
.../apache/rocketmq/proxy/config/ProxyConfig.java | 23 ++++---
.../grpc/v2/consumer/ReceiveMessageActivity.java | 2 +-
.../proxy/processor/ReceiptHandleProcessor.java | 3 +-
.../proxy/common/RenewStrategyPolicyTest.java | 66 ++++++++++++++++++++
.../processor/ReceiptHandleProcessorTest.java | 16 ++---
7 files changed, 168 insertions(+), 19 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 0b3c241d1..263d6157d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
public class MessageReceiptHandle {
private final String group;
@@ -34,6 +35,7 @@ public class MessageReceiptHandle {
private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
private final long consumeTimestamp;
private volatile String receiptHandleStr;
+ private final RetryPolicy renewStrategyPolicy;
public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
long queueOffset, int reconsumeTimes) {
@@ -47,6 +49,7 @@ public class MessageReceiptHandle {
this.queueOffset = queueOffset;
this.reconsumeTimes = reconsumeTimes;
this.consumeTimestamp = receiptHandle.getRetrieveTime();
+ this.renewStrategyPolicy = new RenewStrategyPolicy();
}
@Override
@@ -138,4 +141,8 @@ public class MessageReceiptHandle {
public int getRenewRetryTimes() {
return this.renewRetryTimes.get();
}
+
+ public RetryPolicy getRenewStrategyPolicy(){
+ return this.renewStrategyPolicy;
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java
new file mode 100644
index 000000000..ce33619b4
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import com.google.common.base.MoreObjects;
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class RenewStrategyPolicy implements RetryPolicy {
+ // 1m 3m 5m 6m 10m 30m 1h
+ private long[] next = new long[]{
+ TimeUnit.MINUTES.toMillis(1),
+ TimeUnit.MINUTES.toMillis(3),
+ TimeUnit.MINUTES.toMillis(5),
+ TimeUnit.MINUTES.toMillis(10),
+ TimeUnit.MINUTES.toMillis(30),
+ TimeUnit.HOURS.toMillis(1)
+ };
+
+ public RenewStrategyPolicy() {
+ }
+
+ public RenewStrategyPolicy(long[] next) {
+ this.next = next;
+ }
+
+ public long[] getNext() {
+ return next;
+ }
+
+ public void setNext(long[] next) {
+ this.next = next;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("next", next)
+ .toString();
+ }
+
+ @Override
+ public long nextDelayDuration(int renewTimes) {
+ if (renewTimes < 0) {
+ renewTimes = 0;
+ }
+ int index = renewTimes;
+ if (index >= next.length) {
+ index = next.length - 1;
+ }
+ return next[index];
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index c65877a41..dcbf1af0e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -107,10 +107,16 @@ public class ProxyConfig implements ConfigFile {
*/
private int maxUserPropertySize = 16 * 1024;
private int userPropertyMaxNum = 128;
+
/**
* max message group size, 0 or negative number means no limit for proxy
*/
private int maxMessageGroupSize = 64;
+
+ /**
+ * When a message pops, the message is invisible by default
+ */
+ private long defaultInvisibleTimeMills = Duration.ofSeconds(60).toMillis();
private long minInvisibleTimeMillsForRecv = Duration.ofSeconds(10).toMillis();
private long maxInvisibleTimeMills = Duration.ofHours(12).toMillis();
private long maxDelayTimeMills = Duration.ofDays(1).toMillis();
@@ -180,7 +186,6 @@ public class ProxyConfig implements ConfigFile {
private int renewThreadPoolQueueCapacity = 300;
private long lockTimeoutMsInHandleGroup = TimeUnit.SECONDS.toMillis(3);
private long renewAheadTimeMillis = TimeUnit.SECONDS.toMillis(10);
- private long renewSliceTimeMillis = TimeUnit.SECONDS.toMillis(60);
private long renewMaxTimeMillis = TimeUnit.HOURS.toMillis(3);
private long renewSchedulePeriodMillis = TimeUnit.SECONDS.toMillis(5);
@@ -555,6 +560,14 @@ public class ProxyConfig implements ConfigFile {
this.minInvisibleTimeMillsForRecv = minInvisibleTimeMillsForRecv;
}
+ public long getDefaultInvisibleTimeMills() {
+ return defaultInvisibleTimeMills;
+ }
+
+ public void setDefaultInvisibleTimeMills(long defaultInvisibleTimeMills) {
+ this.defaultInvisibleTimeMills = defaultInvisibleTimeMills;
+ }
+
public long getMaxInvisibleTimeMills() {
return maxInvisibleTimeMills;
}
@@ -1019,14 +1032,6 @@ public class ProxyConfig implements ConfigFile {
this.renewAheadTimeMillis = renewAheadTimeMillis;
}
- public long getRenewSliceTimeMillis() {
- return renewSliceTimeMillis;
- }
-
- public void setRenewSliceTimeMillis(long renewSliceTimeMillis) {
- this.renewSliceTimeMillis = renewSliceTimeMillis;
- }
-
public long getRenewMaxTimeMillis() {
return renewMaxTimeMillis;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index ddbe07083..9df4101f7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -103,7 +103,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity {
long actualInvisibleTime = Durations.toMillis(request.getInvisibleDuration());
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
- actualInvisibleTime = proxyConfig.getRenewSliceTimeMillis();
+ actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills();
} else {
validateInvisibleTime(actualInvisibleTime,
ConfigurationManager.getProxyConfig().getMinInvisibleTimeMillsForRecv());
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index bbd507070..b0a4e8414 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -175,9 +175,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
return CompletableFuture.completedFuture(null);
}
if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
+ RetryPolicy renewPolicy = messageReceiptHandle.getRenewStrategyPolicy();
CompletableFuture<AckResult> future =
messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
- messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
+ messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewRetryTimes()));
future.whenComplete((ackResult, throwable) -> {
if (throwable != null) {
log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java
new file mode 100644
index 000000000..54e627274
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/RenewStrategyPolicyTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+public class RenewStrategyPolicyTest {
+
+ private RetryPolicy retryPolicy;
+ private final AtomicInteger times = new AtomicInteger(0);
+
+ @Before
+ public void before() throws Throwable {
+ this.retryPolicy = new RenewStrategyPolicy();
+ }
+
+ @Test
+ public void testNextDelayDuration() {
+ long value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+ assertEquals(value, TimeUnit.MINUTES.toMillis(1));
+
+ value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+ assertEquals(value, TimeUnit.MINUTES.toMillis(3));
+
+ value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+ assertEquals(value, TimeUnit.MINUTES.toMillis(5));
+
+ value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+ assertEquals(value, TimeUnit.MINUTES.toMillis(10));
+
+ value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+ assertEquals(value, TimeUnit.MINUTES.toMillis(30));
+
+ value = this.retryPolicy.nextDelayDuration(times.getAndIncrement());
+ assertEquals(value, TimeUnit.HOURS.toMillis(1));
+ }
+
+
+ @After
+ public void after() {
+ }
+
+}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 33057da6e..355596ba1 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -111,7 +111,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
}
@Test
@@ -139,16 +139,16 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
ackResult.setStatus(AckStatus.OK);
ackResult.setExtraInfo(newReceiptHandle);
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
.thenReturn(CompletableFuture.completedFuture(ackResult));
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
}
@Test
@@ -161,7 +161,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
.thenReturn(ackResultFuture);
await().atMost(Duration.ofSeconds(1)).until(() -> {
@@ -175,7 +175,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
});
Mockito.verify(messagingProcessor, Mockito.times(config.getMaxRenewRetryTimes()))
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
}
@Test
@@ -187,7 +187,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
.thenReturn(ackResultFuture);
await().atMost(Duration.ofSeconds(1)).until(() -> {
@@ -246,7 +246,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
return futureList.get(count.getAndIncrement());
}).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(() -> {
receiptHandleProcessor.scheduleRenewTask();
try {