You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2023/03/16 11:55:25 UTC
[rocketmq] 05/06: ReceiptHandleProcessor message renewal strategy optimization
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit feb1f3077aef151f2eee9d34980ca47eebf2fc90
Author: loboxu <lo...@tencent.com>
AuthorDate: Thu Mar 9 23:05:22 2023 +0800
ReceiptHandleProcessor message renewal strategy optimization
---
.../proxy/common/MessageReceiptHandle.java | 15 +++++----
.../proxy/processor/ReceiptHandleProcessor.java | 8 +++--
.../processor/ReceiptHandleProcessorTest.java | 37 ++++++++++------------
3 files changed, 31 insertions(+), 29 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 8fa6583b2..e885cf4c2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -21,7 +21,6 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
public class MessageReceiptHandle {
private final String group;
@@ -33,9 +32,9 @@ public class MessageReceiptHandle {
private final int reconsumeTimes;
private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
+ private final AtomicInteger renewTimes = new AtomicInteger(0);
private final long consumeTimestamp;
private volatile String receiptHandleStr;
- private final RetryPolicy renewStrategyPolicy;
public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
long queueOffset, int reconsumeTimes) {
@@ -49,7 +48,6 @@ public class MessageReceiptHandle {
this.queueOffset = queueOffset;
this.reconsumeTimes = reconsumeTimes;
this.consumeTimestamp = receiptHandle.getRetrieveTime();
- this.renewStrategyPolicy = new RenewStrategyPolicy();
}
@Override
@@ -134,6 +132,14 @@ public class MessageReceiptHandle {
return this.renewRetryTimes.incrementAndGet();
}
+ public int incrementRenewTimes() {
+ return this.renewTimes.incrementAndGet();
+ }
+
+ public int getRenewTimes() {
+ return this.renewTimes.get();
+ }
+
public void resetRenewRetryTimes() {
this.renewRetryTimes.set(0);
}
@@ -142,7 +148,4 @@ public class MessageReceiptHandle {
return this.renewRetryTimes.get();
}
- public RetryPolicy getRenewStrategyPolicy() {
- return this.renewStrategyPolicy;
- }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 7b7982bab..357e94249 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
@@ -175,10 +176,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
return CompletableFuture.completedFuture(null);
}
if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
- RetryPolicy renewPolicy = messageReceiptHandle.getRenewStrategyPolicy();
+ RetryPolicy renewPolicy = new RenewStrategyPolicy();
CompletableFuture<AckResult> future =
messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
- messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewRetryTimes()));
+ messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()));
future.whenComplete((ackResult, throwable) -> {
if (throwable != null) {
log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
@@ -191,9 +192,10 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
} else if (AckStatus.OK.equals(ackResult.getStatus())) {
messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
messageReceiptHandle.resetRenewRetryTimes();
+ messageReceiptHandle.incrementRenewTimes();
resFuture.complete(messageReceiptHandle);
} else {
- log.error("renew response is not ok. result:{}", ackResult, messageReceiptHandle);
+ log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle);
resFuture.complete(null);
}
});
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 6d77c1471..c0bff981f 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -124,7 +124,8 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- long newInvisibleTime = 2000L;
+ long newInvisibleTime = 18000L;
+
ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder()
.startOffset(0L)
.retrieveTime(System.currentTimeMillis() - newInvisibleTime + config.getRenewAheadTimeMillis() - 5)
@@ -137,20 +138,28 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
.commitLogOffset(0L)
.build();
String newReceiptHandle = newReceiptHandleClass.encode();
+
+ RetryPolicy retryPolicy = new RenewStrategyPolicy();
+ AtomicInteger times = new AtomicInteger(0);
+
AckResult ackResult = new AckResult();
ackResult.setStatus(AckStatus.OK);
ackResult.setExtraInfo(newReceiptHandle);
+
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get()))))
.thenReturn(CompletableFuture.completedFuture(ackResult));
receiptHandleProcessor.scheduleRenewTask();
+
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get())));
receiptHandleProcessor.scheduleRenewTask();
+
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.incrementAndGet())));
+ receiptHandleProcessor.scheduleRenewTask();
}
@Test
@@ -164,20 +173,11 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
RetryPolicy retryPolicy = new RenewStrategyPolicy();
- AtomicInteger times = new AtomicInteger(0);
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()))))
.thenReturn(ackResultFuture);
- Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
- .thenReturn(ackResultFuture);
-
- Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
- .thenReturn(ackResultFuture);
-
await().atMost(Duration.ofSeconds(1)).until(() -> {
receiptHandleProcessor.scheduleRenewTask();
try {
@@ -188,12 +188,9 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
}
});
- times = new AtomicInteger(0);
- for (int i = 0; i < config.getMaxRenewRetryTimes(); i++) {
- Mockito.verify(messagingProcessor, Mockito.times(1))
- .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())));
- }
+ Mockito.verify(messagingProcessor, Mockito.times(3))
+ .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())));
}
@Test