You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/01 02:11:53 UTC
[rocketmq] branch develop updated: [ISSUE #4938, ISSUE #4939] fix the concurrency problems when renew and ack/changeInvisibleTime at the same time; retry renew for a limited times when some error occurred (#4940)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new cafb31961 [ISSUE #4938, ISSUE #4939] fix the concurrency problems when renew and ack/changeInvisibleTime at the same time; retry renew for a limited times when some error occurred (#4940)
cafb31961 is described below
commit cafb31961271785a26603a0726fc62393805cd96
Author: lk <ka...@alibaba-inc.com>
AuthorDate: Thu Sep 1 10:11:46 2022 +0800
[ISSUE #4938, ISSUE #4939] fix the concurrency problems when renew and ack/changeInvisibleTime at the same time; retry renew for a limited times when some error occurred (#4940)
---
.../proxy/common/MessageReceiptHandle.java | 66 ++--
.../rocketmq/proxy/common/ReceiptHandleGroup.java | 149 +++++++--
.../apache/rocketmq/proxy/config/ProxyConfig.java | 45 +++
.../proxy/grpc/v2/consumer/AckMessageActivity.java | 2 +-
.../consumer/ChangeInvisibleDurationActivity.java | 2 +-
.../grpc/v2/consumer/ReceiveMessageActivity.java | 2 +-
.../v2/producer/ForwardMessageToDLQActivity.java | 2 +-
.../proxy/processor/ReceiptHandleProcessor.java | 341 +++++++++++++--------
.../proxy/common/ReceiptHandleGroupTest.java | 177 +++++++++++
.../ChangeInvisibleDurationActivityTest.java | 2 +-
.../producer/ForwardMessageToDLQActivityTest.java | 2 +-
.../processor/ReceiptHandleProcessorTest.java | 222 ++++++++++++--
12 files changed, 813 insertions(+), 199 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 64e7a122a..379e644f7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.proxy.common;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.consumer.ReceiptHandle;
public class MessageReceiptHandle {
private final String group;
@@ -26,25 +28,27 @@ public class MessageReceiptHandle {
private final int queueId;
private final String messageId;
private final long queueOffset;
- private final String originalReceiptHandle;
- private final long timestamp;
+ private final String originalReceiptHandleStr;
private final int reconsumeTimes;
- private final long expectInvisibleTime;
- private String receiptHandle;
+ private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
+ private volatile long timestamp;
+ private volatile long expectInvisibleTime;
+ private volatile String receiptHandleStr;
- public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandle, String messageId,
- long queueOffset, int reconsumeTimes, long expectInvisibleTime) {
+ public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
+ long queueOffset, int reconsumeTimes) {
+ ReceiptHandle receiptHandle = ReceiptHandle.decode(receiptHandleStr);
this.group = group;
this.topic = topic;
this.queueId = queueId;
- this.receiptHandle = receiptHandle;
- this.originalReceiptHandle = receiptHandle;
+ this.receiptHandleStr = receiptHandleStr;
+ this.originalReceiptHandleStr = receiptHandleStr;
this.messageId = messageId;
this.queueOffset = queueOffset;
this.reconsumeTimes = reconsumeTimes;
- this.expectInvisibleTime = expectInvisibleTime;
- this.timestamp = System.currentTimeMillis();
+ this.expectInvisibleTime = receiptHandle.getInvisibleTime();
+ this.timestamp = receiptHandle.getRetrieveTime();
}
@Override
@@ -59,14 +63,14 @@ public class MessageReceiptHandle {
return queueId == handle.queueId && queueOffset == handle.queueOffset && timestamp == handle.timestamp
&& reconsumeTimes == handle.reconsumeTimes && expectInvisibleTime == handle.expectInvisibleTime
&& Objects.equal(group, handle.group) && Objects.equal(topic, handle.topic)
- && Objects.equal(messageId, handle.messageId) && Objects.equal(originalReceiptHandle, handle.originalReceiptHandle)
- && Objects.equal(receiptHandle, handle.receiptHandle);
+ && Objects.equal(messageId, handle.messageId) && Objects.equal(originalReceiptHandleStr, handle.originalReceiptHandleStr)
+ && Objects.equal(receiptHandleStr, handle.receiptHandleStr);
}
@Override
public int hashCode() {
- return Objects.hashCode(group, topic, queueId, messageId, queueOffset, originalReceiptHandle, timestamp,
- reconsumeTimes, expectInvisibleTime, receiptHandle);
+ return Objects.hashCode(group, topic, queueId, messageId, queueOffset, originalReceiptHandleStr, timestamp,
+ reconsumeTimes, expectInvisibleTime, receiptHandleStr);
}
@Override
@@ -77,11 +81,12 @@ public class MessageReceiptHandle {
.add("queueId", queueId)
.add("messageId", messageId)
.add("queueOffset", queueOffset)
- .add("originalReceiptHandle", originalReceiptHandle)
- .add("timestamp", timestamp)
+ .add("originalReceiptHandleStr", originalReceiptHandleStr)
.add("reconsumeTimes", reconsumeTimes)
+ .add("renewRetryTimes", renewRetryTimes)
+ .add("timestamp", timestamp)
.add("expectInvisibleTime", expectInvisibleTime)
- .add("receiptHandle", receiptHandle)
+ .add("receiptHandleStr", receiptHandleStr)
.toString();
}
@@ -97,12 +102,12 @@ public class MessageReceiptHandle {
return queueId;
}
- public String getReceiptHandle() {
- return receiptHandle;
+ public String getReceiptHandleStr() {
+ return receiptHandleStr;
}
- public String getOriginalReceiptHandle() {
- return originalReceiptHandle;
+ public String getOriginalReceiptHandleStr() {
+ return originalReceiptHandleStr;
}
public String getMessageId() {
@@ -125,7 +130,22 @@ public class MessageReceiptHandle {
return expectInvisibleTime;
}
- public void update(String receiptHandle) {
- this.receiptHandle = receiptHandle;
+ public void updateReceiptHandle(String receiptHandleStr) {
+ ReceiptHandle receiptHandle = ReceiptHandle.decode(receiptHandleStr);
+ this.receiptHandleStr = receiptHandleStr;
+ this.expectInvisibleTime = receiptHandle.getInvisibleTime();
+ this.timestamp = receiptHandle.getRetrieveTime();
+ }
+
+ public int incrementAndGetRenewRetryTimes() {
+ return this.renewRetryTimes.incrementAndGet();
+ }
+
+ public void resetRenewRetryTimes() {
+ this.renewRetryTimes.set(0);
+ }
+
+ public int getRenewRetryTimes() {
+ return this.renewRetryTimes.get();
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index ce68fb2db..d2f447273 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -17,17 +17,81 @@
package org.apache.rocketmq.proxy.common;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
import java.util.Map;
-import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
public class ReceiptHandleGroup {
- private final Map<String /* msgID */, Map<String /* original handle */, MessageReceiptHandle>> receiptHandleMap = new ConcurrentHashMap<>();
+ protected final Map<String /* msgID */, Map<String /* original handle */, HandleData>> receiptHandleMap = new ConcurrentHashMap<>();
+
+ public static class HandleData {
+ private final Semaphore semaphore = new Semaphore(1);
+ private volatile boolean needRemove = false;
+ private volatile MessageReceiptHandle messageReceiptHandle;
+
+ public HandleData(MessageReceiptHandle messageReceiptHandle) {
+ this.messageReceiptHandle = messageReceiptHandle;
+ }
+
+ public boolean lock(long timeoutMs) {
+ try {
+ return this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ public void unlock() {
+ this.semaphore.release();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return this == o;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(semaphore, needRemove, messageReceiptHandle);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("semaphore", semaphore)
+ .add("needRemove", needRemove)
+ .add("messageReceiptHandle", messageReceiptHandle)
+ .toString();
+ }
+ }
public void put(String msgID, String handle, MessageReceiptHandle value) {
- Map<String, MessageReceiptHandle> handleMap = receiptHandleMap.computeIfAbsent(msgID, msgIDKey -> new ConcurrentHashMap<>());
- handleMap.put(handle, value);
+ long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
+ Map<String, HandleData> handleMap = receiptHandleMap.computeIfAbsent(msgID, msgIDKey -> new ConcurrentHashMap<>());
+ handleMap.compute(handle, (handleKey, handleData) -> {
+ if (handleData == null || handleData.needRemove) {
+ return new HandleData(value);
+ }
+ if (!handleData.lock(timeout)) {
+ throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to put handle failed");
+ }
+ try {
+ if (handleData.needRemove) {
+ return new HandleData(value);
+ }
+ handleData.messageReceiptHandle = value;
+ } finally {
+ handleData.unlock();
+ }
+ return handleData;
+ });
}
public boolean isEmpty() {
@@ -35,31 +99,71 @@ public class ReceiptHandleGroup {
}
public MessageReceiptHandle remove(String msgID, String handle) {
- AtomicReference<MessageReceiptHandle> resRef = new AtomicReference<>();
- receiptHandleMap.computeIfPresent(msgID, (msgIDKey, handleMap) -> {
- resRef.set(handleMap.remove(handle));
- if (handleMap.isEmpty()) {
+ Map<String, HandleData> handleMap = this.receiptHandleMap.get(msgID);
+ if (handleMap == null) {
+ return null;
+ }
+ long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
+ AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
+ handleMap.computeIfPresent(handle, (handleKey, handleData) -> {
+ if (!handleData.lock(timeout)) {
+ throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to remove and get handle failed");
+ }
+ try {
+ if (!handleData.needRemove) {
+ handleData.needRemove = true;
+ res.set(handleData.messageReceiptHandle);
+ }
return null;
+ } finally {
+ handleData.unlock();
}
- return handleMap;
});
- return resRef.get();
+ removeHandleMapKeyIfNeed(msgID);
+ return res.get();
}
- public MessageReceiptHandle removeOne(String msgID) {
- AtomicReference<MessageReceiptHandle> resRef = new AtomicReference<>();
- receiptHandleMap.computeIfPresent(msgID, (msgIDKey, handleMap) -> {
- if (handleMap.isEmpty()) {
- return null;
+ public void computeIfPresent(String msgID, String handle,
+ Function<MessageReceiptHandle, CompletableFuture<MessageReceiptHandle>> function) {
+ Map<String, HandleData> handleMap = this.receiptHandleMap.get(msgID);
+ if (handleMap == null) {
+ return;
+ }
+ long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
+ handleMap.computeIfPresent(handle, (handleKey, handleData) -> {
+ if (!handleData.lock(timeout)) {
+ throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to compute failed");
}
- Optional<String> handleKey = handleMap.keySet().stream().findAny();
- resRef.set(handleMap.remove(handleKey.get()));
+ CompletableFuture<MessageReceiptHandle> future = function.apply(handleData.messageReceiptHandle);
+ future.whenComplete((messageReceiptHandle, throwable) -> {
+ try {
+ if (throwable != null) {
+ return;
+ }
+ if (messageReceiptHandle == null) {
+ handleData.needRemove = true;
+ } else {
+ handleData.messageReceiptHandle = messageReceiptHandle;
+ }
+ } finally {
+ handleData.unlock();
+ }
+ if (handleData.needRemove) {
+ handleMap.remove(handleKey, handleData);
+ }
+ removeHandleMapKeyIfNeed(msgID);
+ });
+ return handleData;
+ });
+ }
+
+ protected void removeHandleMapKeyIfNeed(String msgID) {
+ this.receiptHandleMap.computeIfPresent(msgID, (msgIDKey, handleMap) -> {
if (handleMap.isEmpty()) {
return null;
}
return handleMap;
});
- return resRef.get();
}
public interface DataScanner {
@@ -69,8 +173,15 @@ public class ReceiptHandleGroup {
public void scan(DataScanner scanner) {
this.receiptHandleMap.forEach((msgID, handleMap) -> {
handleMap.forEach((handleStr, v) -> {
- scanner.onData(msgID, handleStr, v);
+ scanner.onData(msgID, handleStr, v.messageReceiptHandle);
});
});
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("receiptHandleMap", receiptHandleMap)
+ .toString();
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 9573c3aa0..5a605f28b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -140,6 +140,11 @@ public class ProxyConfig implements ConfigFile {
private long invisibleTimeMillisWhenClear = 1000L;
private boolean enableProxyAutoRenew = true;
+ private int maxRenewRetryTimes = 3;
+ private int renewThreadPoolNums = 2;
+ private int renewMaxThreadPoolNums = 4;
+ private int renewThreadPoolQueueCapacity = 300;
+ private long lockTimeoutMsInHandleGroup = TimeUnit.SECONDS.toMillis(3);
private long renewAheadTimeMillis = TimeUnit.SECONDS.toMillis(10);
private long renewSliceTimeMillis = TimeUnit.SECONDS.toMillis(60);
private long renewMaxTimeMillis = TimeUnit.HOURS.toMillis(3);
@@ -789,6 +794,46 @@ public class ProxyConfig implements ConfigFile {
this.enableProxyAutoRenew = enableProxyAutoRenew;
}
+ public int getMaxRenewRetryTimes() {
+ return maxRenewRetryTimes;
+ }
+
+ public void setMaxRenewRetryTimes(int maxRenewRetryTimes) {
+ this.maxRenewRetryTimes = maxRenewRetryTimes;
+ }
+
+ public int getRenewThreadPoolNums() {
+ return renewThreadPoolNums;
+ }
+
+ public void setRenewThreadPoolNums(int renewThreadPoolNums) {
+ this.renewThreadPoolNums = renewThreadPoolNums;
+ }
+
+ public int getRenewMaxThreadPoolNums() {
+ return renewMaxThreadPoolNums;
+ }
+
+ public void setRenewMaxThreadPoolNums(int renewMaxThreadPoolNums) {
+ this.renewMaxThreadPoolNums = renewMaxThreadPoolNums;
+ }
+
+ public int getRenewThreadPoolQueueCapacity() {
+ return renewThreadPoolQueueCapacity;
+ }
+
+ public void setRenewThreadPoolQueueCapacity(int renewThreadPoolQueueCapacity) {
+ this.renewThreadPoolQueueCapacity = renewThreadPoolQueueCapacity;
+ }
+
+ public long getLockTimeoutMsInHandleGroup() {
+ return lockTimeoutMsInHandleGroup;
+ }
+
+ public void setLockTimeoutMsInHandleGroup(long lockTimeoutMsInHandleGroup) {
+ this.lockTimeoutMsInHandleGroup = lockTimeoutMsInHandleGroup;
+ }
+
public long getRenewAheadTimeMillis() {
return renewAheadTimeMillis;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index 5f4c85af7..fa2241cb3 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -100,7 +100,7 @@ public class AckMessageActivity extends AbstractMessingActivity {
String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
if (messageReceiptHandle != null) {
- handleString = messageReceiptHandle.getReceiptHandle();
+ handleString = messageReceiptHandle.getReceiptHandleStr();
}
CompletableFuture<AckResult> ackResultFuture = this.messagingProcessor.ackMessage(
ctx,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
index 367e5aa33..680364072 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
@@ -57,7 +57,7 @@ public class ChangeInvisibleDurationActivity extends AbstractMessingActivity {
MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group, request.getMessageId(), receiptHandle.getReceiptHandle());
if (messageReceiptHandle != null) {
- receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandle());
+ receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
}
return this.messagingProcessor.changeInvisibleTime(
ctx,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 555a036ee..49763dcdb 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -123,7 +123,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity {
if (receiptHandle != null) {
MessageReceiptHandle messageReceiptHandle =
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
- messageExt.getQueueOffset(), messageExt.getReconsumeTimes(), proxyConfig.getRenewMaxTimeMillis());
+ messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
receiptHandleProcessor.addReceiptHandle(ctx.getClientID(), group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
index dd5d534e2..dec52f3c2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
@@ -50,7 +50,7 @@ public class ForwardMessageToDLQActivity extends AbstractMessingActivity {
String handleString = request.getReceiptHandle();
MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group, request.getMessageId(), request.getReceiptHandle());
if (messageReceiptHandle != null) {
- handleString = messageReceiptHandle.getReceiptHandle();
+ handleString = messageReceiptHandle.getReceiptHandleStr();
}
ReceiptHandle receiptHandle = ReceiptHandle.decode(handleString);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index dc91061fd..9bb8b7f9f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -17,16 +17,18 @@
package org.apache.rocketmq.proxy.processor;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Stopwatch;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
@@ -41,8 +43,11 @@ import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.slf4j.Logger;
@@ -50,17 +55,45 @@ import org.slf4j.LoggerFactory;
public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- protected final ConcurrentMap<String, ReceiptHandleGroup> receiptHandleGroupMap;
+ protected final ConcurrentMap<ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
protected final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
- protected final ExecutorService renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
- 2, 4, 0, TimeUnit.MILLISECONDS,
- "RenewalWorkerThread_", 10000
- );
+ protected ThreadPoolExecutor renewalWorkerService;
protected final MessagingProcessor messagingProcessor;
public ReceiptHandleProcessor(MessagingProcessor messagingProcessor) {
this.messagingProcessor = messagingProcessor;
+ this.receiptHandleGroupMap = new ConcurrentHashMap<>();
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
+ proxyConfig.getRenewThreadPoolNums(),
+ proxyConfig.getRenewMaxThreadPoolNums(),
+ 1, TimeUnit.MINUTES,
+ "RenewalWorkerThread",
+ proxyConfig.getRenewThreadPoolQueueCapacity()
+ );
+ this.init();
+ }
+
+ protected void init() {
+ this.registerConsumerListener();
+ this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
+ this.appendStartAndShutdown(new StartAndShutdown() {
+ @Override
+ public void start() throws Exception {
+ scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0,
+ ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ scheduledExecutorService.shutdown();
+ clearAllHandle();
+ }
+ });
+ }
+
+ protected void registerConsumerListener() {
this.messagingProcessor.registerConsumerListener(new ConsumerIdsChangeListener() {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
@@ -70,7 +103,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
}
if (args[0] instanceof ClientChannelInfo) {
ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
- clearGroup(buildKey(clientChannelInfo.getClientId(), group));
+ clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getClientId(), group));
}
}
}
@@ -80,25 +113,6 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
}
});
- this.receiptHandleGroupMap = new ConcurrentHashMap<>();
- this.init();
- }
-
- protected void init() {
- this.appendStartAndShutdown(new StartAndShutdown() {
- @Override
- public void start() throws Exception {
- log.info("scan for renewal start.");
- scheduledExecutorService.scheduleAtFixedRate(() -> scheduleRenewTask(), 0,
- ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
- log.info("renewal queue has started");
- }
-
- @Override
- public void shutdown() throws Exception {
- scheduledExecutorService.shutdown();
- }
- });
}
protected ProxyContext createContext(String actionName) {
@@ -106,90 +120,121 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
}
protected void scheduleRenewTask() {
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- for (Map.Entry<String, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
- String key = entry.getKey();
- Pair<String, String> clientIdAndGroup = parseKey(key);
- if (clientIdAndGroup == null) {
- log.warn("client id and group is empty. key:{}, receiptHandleGroup:{}", key, entry.getValue());
- clearGroup(key);
- continue;
- }
- if (clientIsOffline(clientIdAndGroup.getLeft(), clientIdAndGroup.getRight())) {
- clearGroup(key);
- continue;
- }
-
- ReceiptHandleGroup group = entry.getValue();
- group.scan((msgID, handleStr, v) -> {
- ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandle());
- long now = System.currentTimeMillis();
- if (handle.getNextVisibleTime() - now > proxyConfig.getRenewAheadTimeMillis()) {
- return;
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ for (Map.Entry<ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
+ ReceiptHandleGroupKey key = entry.getKey();
+ if (clientIsOffline(key)) {
+ clearGroup(key);
+ continue;
}
- SubscriptionGroupConfig subscriptionGroupConfig =
- messagingProcessor.getMetadataService().getSubscriptionGroupConfig(v.getGroup());
- if (subscriptionGroupConfig == null) {
- log.error("Group's subscriptionGroupConfig is null, group = {}", v.getGroup());
- return;
- }
- RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
- renewalWorkerService.submit(() -> renewMessage(key, msgID, v, handle, retryPolicy));
- });
+
+ ReceiptHandleGroup group = entry.getValue();
+ group.scan((msgID, handleStr, v) -> {
+ long current = System.currentTimeMillis();
+ ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
+ if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
+ return;
+ }
+ renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr));
+ });
+ }
+ } catch (Exception e) {
+ log.error("unexpect error when schedule renew task", e);
}
- log.info("scan for renewal done.");
+ log.info("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
+ }
+
+ protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) {
+ try {
+ group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
+ } catch (Exception e) {
+ log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
+ }
}
- protected void renewMessage(String key, String msgID, MessageReceiptHandle messageReceiptHandle,
- ReceiptHandle handle, RetryPolicy retryPolicy) {
+ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
+ CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
ProxyContext context = createContext("RenewMessage");
+ ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
long current = System.currentTimeMillis();
- if (current - messageReceiptHandle.getTimestamp() < messageReceiptHandle.getExpectInvisibleTime()) {
- CompletableFuture<AckResult> future =
- messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
- messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
- future.thenAccept(ackResult -> {
- if (AckStatus.OK.equals(ackResult.getStatus())) {
- messageReceiptHandle.update(ackResult.getExtraInfo());
- addReceiptHandle(key, msgID, messageReceiptHandle.getOriginalReceiptHandle(), messageReceiptHandle);
- }
- });
- } else {
- CompletableFuture<AckResult> future = messagingProcessor.changeInvisibleTime(context,
- handle, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(),
- messageReceiptHandle.getTopic(), retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()));
- future.thenAccept(ackResult -> {
- if (AckStatus.OK.equals(ackResult.getStatus())) {
- removeReceiptHandle(key, msgID, messageReceiptHandle.getOriginalReceiptHandle());
+ try {
+ if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) {
+ log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
+ return CompletableFuture.completedFuture(null);
+ }
+ if (current - messageReceiptHandle.getTimestamp() < messageReceiptHandle.getExpectInvisibleTime()) {
+ CompletableFuture<AckResult> future =
+ messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
+ messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
+ future.whenComplete((ackResult, throwable) -> {
+ if (throwable != null) {
+ log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
+ if (renewExceptionNeedRetry(throwable)) {
+ messageReceiptHandle.incrementAndGetRenewRetryTimes();
+ resFuture.complete(messageReceiptHandle);
+ } else {
+ resFuture.complete(null);
+ }
+ } else if (AckStatus.OK.equals(ackResult.getStatus())) {
+ messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
+ messageReceiptHandle.resetRenewRetryTimes();
+ resFuture.complete(messageReceiptHandle);
+ } else {
+ log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle, throwable);
+ resFuture.complete(null);
+ }
+ });
+ } else {
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ messagingProcessor.getMetadataService().getSubscriptionGroupConfig(messageReceiptHandle.getGroup());
+ if (subscriptionGroupConfig == null) {
+ log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle);
+ return CompletableFuture.completedFuture(null);
}
- });
+ RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
+ CompletableFuture<AckResult> future = messagingProcessor.changeInvisibleTime(context,
+ handle, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(),
+ messageReceiptHandle.getTopic(), retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()));
+ future.whenComplete((ackResult, throwable) -> {
+ if (throwable != null) {
+ log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
+ }
+ resFuture.complete(null);
+ });
+ }
+ } catch (Throwable t) {
+ log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t);
+ resFuture.complete(null);
}
+ return resFuture;
}
- protected String buildKey(String clientID, String group) {
- return clientID + "%" + group;
- }
-
- protected Pair<String, String> parseKey(String key) {
- String[] strs = key.split("%");
- if (strs.length < 2) {
- return null;
+ protected boolean renewExceptionNeedRetry(Throwable t) {
+ t = ExceptionUtils.getRealException(t);
+ if (t instanceof ProxyException) {
+ ProxyException proxyException = (ProxyException) t;
+ if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
+ ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
+ return false;
+ }
}
- return Pair.of(strs[0], strs[1]);
+ return true;
}
- protected boolean clientIsOffline(String clientID, String group) {
- return this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), group, clientID) == null;
+ protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
+ return this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), groupKey.group, groupKey.clientId) == null;
}
public void addReceiptHandle(String clientID, String group, String msgID, String receiptHandle,
MessageReceiptHandle messageReceiptHandle) {
- this.addReceiptHandle(buildKey(clientID, group), msgID, receiptHandle, messageReceiptHandle);
+ this.addReceiptHandle(new ReceiptHandleGroupKey(clientID, group), msgID, receiptHandle, messageReceiptHandle);
}
- protected void addReceiptHandle(String key, String msgID, String receiptHandle,
+ protected void addReceiptHandle(ReceiptHandleGroupKey key, String msgID, String receiptHandle,
MessageReceiptHandle messageReceiptHandle) {
if (key == null) {
return;
@@ -199,44 +244,21 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
}
public MessageReceiptHandle removeReceiptHandle(String clientID, String group, String msgID, String receiptHandle) {
- return this.removeReceiptHandle(buildKey(clientID, group), msgID, receiptHandle);
+ return this.removeReceiptHandle(new ReceiptHandleGroupKey(clientID, group), msgID, receiptHandle);
}
- protected MessageReceiptHandle removeReceiptHandle(String key, String msgID, String receiptHandle) {
+ protected MessageReceiptHandle removeReceiptHandle(ReceiptHandleGroupKey key, String msgID, String receiptHandle) {
if (key == null) {
return null;
}
- AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
- receiptHandleGroupMap.computeIfPresent(key, (k, v) -> {
- res.set(v.remove(msgID, receiptHandle));
- if (v.isEmpty()) {
- return null;
- }
- return v;
- });
- return res.get();
- }
-
- public MessageReceiptHandle removeOneReceiptHandle(String clientID, String group, String msgID) {
- return removeOneReceiptHandle(buildKey(clientID, group), msgID);
- }
-
- protected MessageReceiptHandle removeOneReceiptHandle(String key, String msgID) {
- if (key == null) {
+ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(key);
+ if (handleGroup == null) {
return null;
}
- AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
- receiptHandleGroupMap.computeIfPresent(key, (k, v) -> {
- res.set(v.removeOne(msgID));
- if (v.isEmpty()) {
- return null;
- }
- return v;
- });
- return res.get();
+ return handleGroup.remove(msgID, receiptHandle);
}
- protected void clearGroup(String key) {
+ protected void clearGroup(ReceiptHandleGroupKey key) {
if (key == null) {
return;
}
@@ -246,16 +268,75 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
if (handleGroup == null) {
return;
}
- handleGroup.scan((msgID, handle, messageReceiptHandle) -> {
- ReceiptHandle receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandle());
- messagingProcessor.changeInvisibleTime(
- context,
- receiptHandle,
- messageReceiptHandle.getMessageId(),
- messageReceiptHandle.getGroup(),
- messageReceiptHandle.getTopic(),
- proxyConfig.getInvisibleTimeMillisWhenClear()
- );
+ handleGroup.scan((msgID, handle, v) -> {
+ try {
+ handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
+ ReceiptHandle receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
+ messagingProcessor.changeInvisibleTime(
+ context,
+ receiptHandle,
+ messageReceiptHandle.getMessageId(),
+ messageReceiptHandle.getGroup(),
+ messageReceiptHandle.getTopic(),
+ proxyConfig.getInvisibleTimeMillisWhenClear()
+ );
+ return CompletableFuture.completedFuture(null);
+ });
+ } catch (Exception e) {
+ log.error("error when clear handle for group. key:{}", key, e);
+ }
});
}
+
+ protected void clearAllHandle() {
+ log.info("start clear all handle in receiptHandleProcessor");
+ Set<ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
+ for (ReceiptHandleGroupKey key : keySet) {
+ clearGroup(key);
+ }
+ log.info("clear all handle in receiptHandleProcessor done");
+ }
+
+ public static class ReceiptHandleGroupKey {
+ private final String clientId;
+ private final String group;
+
+ public ReceiptHandleGroupKey(String clientId, String group) {
+ this.clientId = clientId;
+ this.group = group;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
+ return Objects.equal(clientId, key.clientId) && Objects.equal(group, key.group);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(clientId, group);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("clientId", clientId)
+ .add("group", group)
+ .toString();
+ }
+ }
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
new file mode 100644
index 000000000..6aca7ac6f
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.common;
+
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.consumer.ReceiptHandle;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.proxy.common.utils.FutureUtils;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ReceiptHandleGroupTest extends InitConfigAndLoggerTest {
+
+ private static final String TOPIC = "topic";
+ private static final String GROUP = "group";
+ private ReceiptHandleGroup receiptHandleGroup;
+ private String msgID;
+ private final Random random = new Random();
+
+ @Before
+ public void before() throws Throwable {
+ super.before();
+ receiptHandleGroup = new ReceiptHandleGroup();
+ msgID = MessageClientIDSetter.createUniqID();
+ }
+
+ protected String createHandle() {
+ return ReceiptHandle.builder()
+ .startOffset(0L)
+ .retrieveTime(System.currentTimeMillis())
+ .invisibleTime(3000)
+ .reviveQueueId(1)
+ .topicType(ReceiptHandle.NORMAL_TOPIC)
+ .brokerName("brokerName")
+ .queueId(random.nextInt(10))
+ .offset(random.nextInt(10))
+ .commitLogOffset(0L)
+ .build().encode();
+ }
+
+ @Test
+ public void testRemoveWhenComputeIfPresent() {
+ String handle1 = createHandle();
+ String handle2 = createHandle();
+ AtomicReference<MessageReceiptHandle> removeHandleRef = new AtomicReference<>();
+
+ receiptHandleGroup.put(msgID, handle1, createMessageReceiptHandle(handle1, msgID));
+ CountDownLatch latch = new CountDownLatch(2);
+ Thread removeThread = new Thread(() -> {
+ try {
+ latch.countDown();
+ latch.await();
+ removeHandleRef.set(receiptHandleGroup.remove(msgID, handle1));
+ } catch (Exception ignored) {
+ }
+ }, "removeThread");
+ Thread computeThread = new Thread(() -> {
+ try {
+ receiptHandleGroup.computeIfPresent(msgID, handle1, messageReceiptHandle -> {
+ try {
+ latch.countDown();
+ latch.await();
+ } catch (Exception ignored) {
+ }
+ messageReceiptHandle.updateReceiptHandle(handle2);
+ return FutureUtils.addExecutor(CompletableFuture.completedFuture(messageReceiptHandle), Executors.newCachedThreadPool());
+ });
+ } catch (Exception ignored) {
+ }
+ }, "computeThread");
+ removeThread.start();
+ computeThread.start();
+
+ await().atMost(Duration.ofSeconds(1)).until(() -> removeHandleRef.get() != null);
+ assertEquals(handle2, removeHandleRef.get().getReceiptHandleStr());
+ assertTrue(receiptHandleGroup.isEmpty());
+ }
+
+ @Test
+ public void testRemoveWhenComputeIfPresentReturnNull() {
+ String handle1 = createHandle();
+ AtomicBoolean removeCalled = new AtomicBoolean(false);
+ AtomicReference<MessageReceiptHandle> removeHandleRef = new AtomicReference<>();
+
+ receiptHandleGroup.put(msgID, handle1, createMessageReceiptHandle(handle1, msgID));
+ CountDownLatch latch = new CountDownLatch(2);
+ Thread removeThread = new Thread(() -> {
+ try {
+ latch.countDown();
+ latch.await();
+ removeHandleRef.set(receiptHandleGroup.remove(msgID, handle1));
+ removeCalled.set(true);
+ } catch (Exception ignored) {
+ }
+ }, "removeThread");
+ Thread computeThread = new Thread(() -> {
+ try {
+ receiptHandleGroup.computeIfPresent(msgID, handle1, messageReceiptHandle -> {
+ try {
+ latch.countDown();
+ latch.await();
+ } catch (Exception ignored) {
+ }
+ return FutureUtils.addExecutor(CompletableFuture.completedFuture(null), Executors.newCachedThreadPool());
+ });
+ } catch (Exception ignored) {
+ }
+ }, "computeThread");
+ removeThread.start();
+ computeThread.start();
+
+ await().atMost(Duration.ofSeconds(1)).until(removeCalled::get);
+ assertNull(removeHandleRef.get());
+ assertTrue(receiptHandleGroup.isEmpty());
+ }
+
+ @Test
+ public void testRemoveMultiThread() {
+ String handle1 = createHandle();
+ AtomicReference<MessageReceiptHandle> removeHandleRef = new AtomicReference<>();
+ AtomicInteger count = new AtomicInteger();
+
+ receiptHandleGroup.put(msgID, handle1, createMessageReceiptHandle(handle1, msgID));
+ int threadNum = Math.max(Runtime.getRuntime().availableProcessors(), 3);
+ CountDownLatch latch = new CountDownLatch(threadNum);
+ for (int i = 0; i < threadNum; i++) {
+ Thread thread = new Thread(() -> {
+ try {
+ latch.countDown();
+ latch.await();
+ MessageReceiptHandle handle = receiptHandleGroup.remove(msgID, handle1);
+ if (handle != null) {
+ removeHandleRef.set(handle);
+ count.incrementAndGet();
+ }
+ } catch (Exception ignored) {
+ }
+ });
+ thread.start();
+ }
+
+ await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> assertEquals(1, count.get()));
+ assertEquals(handle1, removeHandleRef.get().getReceiptHandleStr());
+ assertTrue(receiptHandleGroup.isEmpty());
+ }
+
+ private MessageReceiptHandle createMessageReceiptHandle(String handle, String msgID) {
+ return new MessageReceiptHandle(GROUP, TOPIC, 0, handle, msgID, 0, 0);
+ }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
index baa7a59ef..012649465 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
@@ -93,7 +93,7 @@ public class ChangeInvisibleDurationActivityTest extends BaseActivityTest {
any(), receiptHandleCaptor.capture(), anyString(), anyString(), anyString(), invisibleTimeArgumentCaptor.capture()
)).thenReturn(CompletableFuture.completedFuture(ackResult));
when(receiptHandleProcessor.removeReceiptHandle(anyString(), anyString(), anyString(), anyString()))
- .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0, 3000));
+ .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0));
ChangeInvisibleDurationResponse response = this.changeInvisibleDurationActivity.changeInvisibleDuration(
createContext(),
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
index f3a6229a7..cd3c48e1a 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
@@ -76,7 +76,7 @@ public class ForwardMessageToDLQActivityTest extends BaseActivityTest {
String savedHandleStr = buildReceiptHandle("topic", System.currentTimeMillis(),3000);
when(receiptHandleProcessor.removeReceiptHandle(anyString(), anyString(), anyString(), anyString()))
- .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0, 3000));
+ .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0));
ForwardMessageToDeadLetterQueueResponse response = this.forwardMessageToDLQActivity.forwardMessageToDeadLetterQueue(
createContext(),
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 1f8633258..93ab0210c 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -17,21 +17,38 @@
package org.apache.rocketmq.proxy.processor;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.proxy.common.ContextVariable;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class ReceiptHandleProcessorTest extends BaseProcessorTest {
private ReceiptHandleProcessor receiptHandleProcessor;
@@ -43,30 +60,32 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
private static final int QUEUE_ID = 1;
private static final String MESSAGE_ID = "messageId";
private static final long OFFSET = 123L;
- private static final long INVISIBLE_TIME = 100000L;
+ private static final long INVISIBLE_TIME = 60000L;
private static final int RECONSUME_TIMES = 1;
private static final String MSG_ID = MessageClientIDSetter.createUniqID();
private MessageReceiptHandle messageReceiptHandle;
- private final String receiptHandle = ReceiptHandle.builder()
- .startOffset(0L)
- .retrieveTime(0)
- .invisibleTime(INVISIBLE_TIME)
- .reviveQueueId(1)
- .topicType(ReceiptHandle.NORMAL_TOPIC)
- .brokerName(BROKER_NAME)
- .queueId(QUEUE_ID)
- .offset(OFFSET)
- .commitLogOffset(0L)
- .build().encode();
+ private String receiptHandle;
@Before
public void setup() {
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ receiptHandle = ReceiptHandle.builder()
+ .startOffset(0L)
+ .retrieveTime(System.currentTimeMillis() - INVISIBLE_TIME + config.getRenewAheadTimeMillis() - 5)
+ .invisibleTime(INVISIBLE_TIME)
+ .reviveQueueId(1)
+ .topicType(ReceiptHandle.NORMAL_TOPIC)
+ .brokerName(BROKER_NAME)
+ .queueId(QUEUE_ID)
+ .offset(OFFSET)
+ .commitLogOffset(0L)
+ .build().encode();
PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id");
receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor);
Mockito.doNothing().when(messagingProcessor).registerConsumerListener(Mockito.any(ConsumerIdsChangeListener.class));
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
- RECONSUME_TIMES, INVISIBLE_TIME);
+ RECONSUME_TIMES);
}
@Test
@@ -83,6 +102,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
@Test
public void testRenewReceiptHandle() {
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
@@ -91,7 +111,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
long newInvisibleTime = 2000L;
ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder()
.startOffset(0L)
- .retrieveTime(0)
+ .retrieveTime(System.currentTimeMillis() - newInvisibleTime + config.getRenewAheadTimeMillis() - 5)
.invisibleTime(newInvisibleTime)
.reviveQueueId(1)
.topicType(ReceiptHandle.NORMAL_TOPIC)
@@ -117,6 +137,114 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
}
+ @Test
+ public void testRenewExceedMaxRenewTimes() {
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+
+ CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
+ ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
+ Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+ .thenReturn(ackResultFuture);
+
+ await().atMost(Duration.ofSeconds(1)).until(() -> {
+ receiptHandleProcessor.scheduleRenewTask();
+ try {
+ ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+ return receiptHandleGroup.isEmpty();
+ } catch (Exception e) {
+ return false;
+ }
+ });
+ Mockito.verify(messagingProcessor, Mockito.times(config.getMaxRenewRetryTimes()))
+ .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+ }
+
+ @Test
+ public void testRenewWithInvalidHandle() {
+ String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+
+ CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
+ ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
+ Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+ .thenReturn(ackResultFuture);
+
+ await().atMost(Duration.ofSeconds(1)).until(() -> {
+ receiptHandleProcessor.scheduleRenewTask();
+ try {
+ ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+ return receiptHandleGroup.isEmpty();
+ } catch (Exception e) {
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void testRenewWithErrorThenOK() {
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+
+ AtomicInteger count = new AtomicInteger(0);
+ List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
+ {
+ CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
+ ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
+ futureList.add(ackResultFuture);
+ futureList.add(ackResultFuture);
+ }
+ {
+ long newInvisibleTime = 2000L;
+ ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder()
+ .startOffset(0L)
+ .retrieveTime(System.currentTimeMillis() - newInvisibleTime + config.getRenewAheadTimeMillis() - 5)
+ .invisibleTime(newInvisibleTime)
+ .reviveQueueId(1)
+ .topicType(ReceiptHandle.NORMAL_TOPIC)
+ .brokerName(BROKER_NAME)
+ .queueId(QUEUE_ID)
+ .offset(OFFSET)
+ .commitLogOffset(0L)
+ .build();
+ String newReceiptHandle = newReceiptHandleClass.encode();
+ AckResult ackResult = new AckResult();
+ ackResult.setStatus(AckStatus.OK);
+ ackResult.setExtraInfo(newReceiptHandle);
+ futureList.add(CompletableFuture.completedFuture(ackResult));
+ }
+ {
+ CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
+ ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
+ futureList.add(ackResultFuture);
+ futureList.add(ackResultFuture);
+ futureList.add(ackResultFuture);
+ futureList.add(ackResultFuture);
+ }
+ Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
+ return futureList.get(count.getAndIncrement());
+ }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+ await().atMost(Duration.ofSeconds(1)).until(() -> {
+ receiptHandleProcessor.scheduleRenewTask();
+ try {
+ ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+ return receiptHandleGroup.isEmpty();
+ } catch (Exception e) {
+ return false;
+ }
+ });
+ assertEquals(6, count.get());
+ }
+
@Test
public void testRenewReceiptHandleWhenTimeout() {
long newInvisibleTime = 0L;
@@ -131,17 +259,59 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
.offset(OFFSET)
.commitLogOffset(0L)
.build().encode();
- messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
- RECONSUME_TIMES, newInvisibleTime);
+ messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
+ RECONSUME_TIMES);
String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle);
Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+ Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(new AckResult()));
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(groupConfig.getGroupRetryPolicy().getRetryPolicy().nextDelayDuration(RECONSUME_TIMES)));
+
+ ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+ assertTrue(receiptHandleGroup.isEmpty());
+ }
+
+ @Test
+ public void testRenewReceiptHandleWhenTimeoutWithNoSubscription() {
+ long newInvisibleTime = 0L;
+ String newReceiptHandle = ReceiptHandle.builder()
+ .startOffset(0L)
+ .retrieveTime(0)
+ .invisibleTime(newInvisibleTime)
+ .reviveQueueId(1)
+ .topicType(ReceiptHandle.NORMAL_TOPIC)
+ .brokerName(BROKER_NAME)
+ .queueId(QUEUE_ID)
+ .offset(OFFSET)
+ .commitLogOffset(0L)
+ .build().encode();
+ messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
+ RECONSUME_TIMES);
+ String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
+ receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle);
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(null);
+ Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(new AckResult()));
+ receiptHandleProcessor.scheduleRenewTask();
+ await().atMost(Duration.ofSeconds(1)).until(() -> {
+ try {
+ ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+ return receiptHandleGroup.isEmpty();
+ } catch (Exception e) {
+ return false;
+ }
+ });
+
+ Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
+ .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(),
+ Mockito.anyString(), Mockito.anyString(), Mockito.anyLong());
}
@Test
@@ -158,7 +328,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
.commitLogOffset(0L)
.build().encode();
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
- RECONSUME_TIMES, INVISIBLE_TIME);
+ RECONSUME_TIMES);
String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
@@ -173,8 +343,8 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
@Test
public void testRemoveReceiptHandle() {
String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- receiptHandleProcessor.addReceiptHandle(channelId, MSG_ID, receiptHandle, messageReceiptHandle);
- receiptHandleProcessor.removeReceiptHandle(channelId, MSG_ID, receiptHandle);
+ receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.removeReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
receiptHandleProcessor.scheduleRenewTask();
@@ -186,8 +356,8 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
@Test
public void testClearGroup() {
String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- receiptHandleProcessor.addReceiptHandle(channelId, MSG_ID, receiptHandle, messageReceiptHandle);
- receiptHandleProcessor.clearGroup(channelId);
+ receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channelId, GROUP));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
receiptHandleProcessor.scheduleRenewTask();
@@ -195,4 +365,14 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getInvisibleTimeMillisWhenClear()));
}
+
+ @Test
+ public void testClientOffline() {
+ ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
+ Mockito.verify(messagingProcessor, Mockito.times(1)).registerConsumerListener(listenerArgumentCaptor.capture());
+ String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
+ receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+ listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(null, channelId, LanguageCode.JAVA, 0));
+ assertTrue(receiptHandleProcessor.receiptHandleGroupMap.isEmpty());
+ }
}
\ No newline at end of file