You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/01 02:11:53 UTC

[rocketmq] branch develop updated: [ISSUE #4938, ISSUE #4939] fix the concurrency problems when renew and ack/changeInvisibleTime at the same time; retry renew for a limited times when some error occurred (#4940)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new cafb31961 [ISSUE #4938, ISSUE #4939] fix the concurrency problems when renew and ack/changeInvisibleTime at the same time; retry renew for a limited times when some error occurred (#4940)
cafb31961 is described below

commit cafb31961271785a26603a0726fc62393805cd96
Author: lk <ka...@alibaba-inc.com>
AuthorDate: Thu Sep 1 10:11:46 2022 +0800

    [ISSUE #4938, ISSUE #4939] fix the concurrency problems when renew and ack/changeInvisibleTime at the same time; retry renew for a limited times when some error occurred (#4940)
---
 .../proxy/common/MessageReceiptHandle.java         |  66 ++--
 .../rocketmq/proxy/common/ReceiptHandleGroup.java  | 149 +++++++--
 .../apache/rocketmq/proxy/config/ProxyConfig.java  |  45 +++
 .../proxy/grpc/v2/consumer/AckMessageActivity.java |   2 +-
 .../consumer/ChangeInvisibleDurationActivity.java  |   2 +-
 .../grpc/v2/consumer/ReceiveMessageActivity.java   |   2 +-
 .../v2/producer/ForwardMessageToDLQActivity.java   |   2 +-
 .../proxy/processor/ReceiptHandleProcessor.java    | 341 +++++++++++++--------
 .../proxy/common/ReceiptHandleGroupTest.java       | 177 +++++++++++
 .../ChangeInvisibleDurationActivityTest.java       |   2 +-
 .../producer/ForwardMessageToDLQActivityTest.java  |   2 +-
 .../processor/ReceiptHandleProcessorTest.java      | 222 ++++++++++++--
 12 files changed, 813 insertions(+), 199 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 64e7a122a..379e644f7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.proxy.common;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.consumer.ReceiptHandle;
 
 public class MessageReceiptHandle {
     private final String group;
@@ -26,25 +28,27 @@ public class MessageReceiptHandle {
     private final int queueId;
     private final String messageId;
     private final long queueOffset;
-    private final String originalReceiptHandle;
-    private final long timestamp;
+    private final String originalReceiptHandleStr;
     private final int reconsumeTimes;
-    private final long expectInvisibleTime;
 
-    private String receiptHandle;
+    private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
+    private volatile long timestamp;
+    private volatile long expectInvisibleTime;
+    private volatile String receiptHandleStr;
 
-    public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandle, String messageId,
-        long queueOffset, int reconsumeTimes, long expectInvisibleTime) {
+    public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
+        long queueOffset, int reconsumeTimes) {
+        ReceiptHandle receiptHandle = ReceiptHandle.decode(receiptHandleStr);
         this.group = group;
         this.topic = topic;
         this.queueId = queueId;
-        this.receiptHandle = receiptHandle;
-        this.originalReceiptHandle = receiptHandle;
+        this.receiptHandleStr = receiptHandleStr;
+        this.originalReceiptHandleStr = receiptHandleStr;
         this.messageId = messageId;
         this.queueOffset = queueOffset;
         this.reconsumeTimes = reconsumeTimes;
-        this.expectInvisibleTime = expectInvisibleTime;
-        this.timestamp = System.currentTimeMillis();
+        this.expectInvisibleTime = receiptHandle.getInvisibleTime();
+        this.timestamp = receiptHandle.getRetrieveTime();
     }
 
     @Override
@@ -59,14 +63,14 @@ public class MessageReceiptHandle {
         return queueId == handle.queueId && queueOffset == handle.queueOffset && timestamp == handle.timestamp
             && reconsumeTimes == handle.reconsumeTimes && expectInvisibleTime == handle.expectInvisibleTime
             && Objects.equal(group, handle.group) && Objects.equal(topic, handle.topic)
-            && Objects.equal(messageId, handle.messageId) && Objects.equal(originalReceiptHandle, handle.originalReceiptHandle)
-            && Objects.equal(receiptHandle, handle.receiptHandle);
+            && Objects.equal(messageId, handle.messageId) && Objects.equal(originalReceiptHandleStr, handle.originalReceiptHandleStr)
+            && Objects.equal(receiptHandleStr, handle.receiptHandleStr);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(group, topic, queueId, messageId, queueOffset, originalReceiptHandle, timestamp,
-            reconsumeTimes, expectInvisibleTime, receiptHandle);
+        return Objects.hashCode(group, topic, queueId, messageId, queueOffset, originalReceiptHandleStr, timestamp,
+            reconsumeTimes, expectInvisibleTime, receiptHandleStr);
     }
 
     @Override
@@ -77,11 +81,12 @@ public class MessageReceiptHandle {
             .add("queueId", queueId)
             .add("messageId", messageId)
             .add("queueOffset", queueOffset)
-            .add("originalReceiptHandle", originalReceiptHandle)
-            .add("timestamp", timestamp)
+            .add("originalReceiptHandleStr", originalReceiptHandleStr)
             .add("reconsumeTimes", reconsumeTimes)
+            .add("renewRetryTimes", renewRetryTimes)
+            .add("timestamp", timestamp)
             .add("expectInvisibleTime", expectInvisibleTime)
-            .add("receiptHandle", receiptHandle)
+            .add("receiptHandleStr", receiptHandleStr)
             .toString();
     }
 
@@ -97,12 +102,12 @@ public class MessageReceiptHandle {
         return queueId;
     }
 
-    public String getReceiptHandle() {
-        return receiptHandle;
+    public String getReceiptHandleStr() {
+        return receiptHandleStr;
     }
 
-    public String getOriginalReceiptHandle() {
-        return originalReceiptHandle;
+    public String getOriginalReceiptHandleStr() {
+        return originalReceiptHandleStr;
     }
 
     public String getMessageId() {
@@ -125,7 +130,22 @@ public class MessageReceiptHandle {
         return expectInvisibleTime;
     }
 
-    public void update(String receiptHandle) {
-        this.receiptHandle = receiptHandle;
+    public void updateReceiptHandle(String receiptHandleStr) {
+        ReceiptHandle receiptHandle = ReceiptHandle.decode(receiptHandleStr);
+        this.receiptHandleStr = receiptHandleStr;
+        this.expectInvisibleTime = receiptHandle.getInvisibleTime();
+        this.timestamp = receiptHandle.getRetrieveTime();
+    }
+
+    public int incrementAndGetRenewRetryTimes() {
+        return this.renewRetryTimes.incrementAndGet();
+    }
+
+    public void resetRenewRetryTimes() {
+        this.renewRetryTimes.set(0);
+    }
+
+    public int getRenewRetryTimes() {
+        return this.renewRetryTimes.get();
     }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index ce68fb2db..d2f447273 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -17,17 +17,81 @@
 
 package org.apache.rocketmq.proxy.common;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
 import java.util.Map;
-import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
 
 public class ReceiptHandleGroup {
-    private final Map<String /* msgID */, Map<String /* original handle */, MessageReceiptHandle>> receiptHandleMap = new ConcurrentHashMap<>();
+    protected final Map<String /* msgID */, Map<String /* original handle */, HandleData>> receiptHandleMap = new ConcurrentHashMap<>();
+
+    public static class HandleData {
+        private final Semaphore semaphore = new Semaphore(1);
+        private volatile boolean needRemove = false;
+        private volatile MessageReceiptHandle messageReceiptHandle;
+
+        public HandleData(MessageReceiptHandle messageReceiptHandle) {
+            this.messageReceiptHandle = messageReceiptHandle;
+        }
+
+        public boolean lock(long timeoutMs) {
+            try {
+                return this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                return false;
+            }
+        }
+
+        public void unlock() {
+            this.semaphore.release();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return this == o;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(semaphore, needRemove, messageReceiptHandle);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("semaphore", semaphore)
+                .add("needRemove", needRemove)
+                .add("messageReceiptHandle", messageReceiptHandle)
+                .toString();
+        }
+    }
 
     public void put(String msgID, String handle, MessageReceiptHandle value) {
-        Map<String, MessageReceiptHandle> handleMap = receiptHandleMap.computeIfAbsent(msgID, msgIDKey -> new ConcurrentHashMap<>());
-        handleMap.put(handle, value);
+        long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
+        Map<String, HandleData> handleMap = receiptHandleMap.computeIfAbsent(msgID, msgIDKey -> new ConcurrentHashMap<>());
+        handleMap.compute(handle, (handleKey, handleData) -> {
+            if (handleData == null || handleData.needRemove) {
+                return new HandleData(value);
+            }
+            if (!handleData.lock(timeout)) {
+                throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to put handle failed");
+            }
+            try {
+                if (handleData.needRemove) {
+                    return new HandleData(value);
+                }
+                handleData.messageReceiptHandle = value;
+            } finally {
+                handleData.unlock();
+            }
+            return handleData;
+        });
     }
 
     public boolean isEmpty() {
@@ -35,31 +99,71 @@ public class ReceiptHandleGroup {
     }
 
     public MessageReceiptHandle remove(String msgID, String handle) {
-        AtomicReference<MessageReceiptHandle> resRef = new AtomicReference<>();
-        receiptHandleMap.computeIfPresent(msgID, (msgIDKey, handleMap) -> {
-            resRef.set(handleMap.remove(handle));
-            if (handleMap.isEmpty()) {
+        Map<String, HandleData> handleMap = this.receiptHandleMap.get(msgID);
+        if (handleMap == null) {
+            return null;
+        }
+        long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
+        AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
+        handleMap.computeIfPresent(handle, (handleKey, handleData) -> {
+            if (!handleData.lock(timeout)) {
+                throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to remove and get handle failed");
+            }
+            try {
+                if (!handleData.needRemove) {
+                    handleData.needRemove = true;
+                    res.set(handleData.messageReceiptHandle);
+                }
                 return null;
+            } finally {
+                handleData.unlock();
             }
-            return handleMap;
         });
-        return resRef.get();
+        removeHandleMapKeyIfNeed(msgID);
+        return res.get();
     }
 
-    public MessageReceiptHandle removeOne(String msgID) {
-        AtomicReference<MessageReceiptHandle> resRef = new AtomicReference<>();
-        receiptHandleMap.computeIfPresent(msgID, (msgIDKey, handleMap) -> {
-            if (handleMap.isEmpty()) {
-                return null;
+    public void computeIfPresent(String msgID, String handle,
+        Function<MessageReceiptHandle, CompletableFuture<MessageReceiptHandle>> function) {
+        Map<String, HandleData> handleMap = this.receiptHandleMap.get(msgID);
+        if (handleMap == null) {
+            return;
+        }
+        long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
+        handleMap.computeIfPresent(handle, (handleKey, handleData) -> {
+            if (!handleData.lock(timeout)) {
+                throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to compute failed");
             }
-            Optional<String> handleKey = handleMap.keySet().stream().findAny();
-            resRef.set(handleMap.remove(handleKey.get()));
+            CompletableFuture<MessageReceiptHandle> future = function.apply(handleData.messageReceiptHandle);
+            future.whenComplete((messageReceiptHandle, throwable) -> {
+                try {
+                    if (throwable != null) {
+                        return;
+                    }
+                    if (messageReceiptHandle == null) {
+                        handleData.needRemove = true;
+                    } else {
+                        handleData.messageReceiptHandle = messageReceiptHandle;
+                    }
+                } finally {
+                    handleData.unlock();
+                }
+                if (handleData.needRemove) {
+                    handleMap.remove(handleKey, handleData);
+                }
+                removeHandleMapKeyIfNeed(msgID);
+            });
+            return handleData;
+        });
+    }
+
+    protected void removeHandleMapKeyIfNeed(String msgID) {
+        this.receiptHandleMap.computeIfPresent(msgID, (msgIDKey, handleMap) -> {
             if (handleMap.isEmpty()) {
                 return null;
             }
             return handleMap;
         });
-        return resRef.get();
     }
 
     public interface DataScanner {
@@ -69,8 +173,15 @@ public class ReceiptHandleGroup {
     public void scan(DataScanner scanner) {
         this.receiptHandleMap.forEach((msgID, handleMap) -> {
             handleMap.forEach((handleStr, v) -> {
-                scanner.onData(msgID, handleStr, v);
+                scanner.onData(msgID, handleStr, v.messageReceiptHandle);
             });
         });
     }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("receiptHandleMap", receiptHandleMap)
+            .toString();
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 9573c3aa0..5a605f28b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -140,6 +140,11 @@ public class ProxyConfig implements ConfigFile {
 
     private long invisibleTimeMillisWhenClear = 1000L;
     private boolean enableProxyAutoRenew = true;
+    private int maxRenewRetryTimes = 3;
+    private int renewThreadPoolNums = 2;
+    private int renewMaxThreadPoolNums = 4;
+    private int renewThreadPoolQueueCapacity = 300;
+    private long lockTimeoutMsInHandleGroup = TimeUnit.SECONDS.toMillis(3);
     private long renewAheadTimeMillis = TimeUnit.SECONDS.toMillis(10);
     private long renewSliceTimeMillis = TimeUnit.SECONDS.toMillis(60);
     private long renewMaxTimeMillis = TimeUnit.HOURS.toMillis(3);
@@ -789,6 +794,46 @@ public class ProxyConfig implements ConfigFile {
         this.enableProxyAutoRenew = enableProxyAutoRenew;
     }
 
+    public int getMaxRenewRetryTimes() {
+        return maxRenewRetryTimes;
+    }
+
+    public void setMaxRenewRetryTimes(int maxRenewRetryTimes) {
+        this.maxRenewRetryTimes = maxRenewRetryTimes;
+    }
+
+    public int getRenewThreadPoolNums() {
+        return renewThreadPoolNums;
+    }
+
+    public void setRenewThreadPoolNums(int renewThreadPoolNums) {
+        this.renewThreadPoolNums = renewThreadPoolNums;
+    }
+
+    public int getRenewMaxThreadPoolNums() {
+        return renewMaxThreadPoolNums;
+    }
+
+    public void setRenewMaxThreadPoolNums(int renewMaxThreadPoolNums) {
+        this.renewMaxThreadPoolNums = renewMaxThreadPoolNums;
+    }
+
+    public int getRenewThreadPoolQueueCapacity() {
+        return renewThreadPoolQueueCapacity;
+    }
+
+    public void setRenewThreadPoolQueueCapacity(int renewThreadPoolQueueCapacity) {
+        this.renewThreadPoolQueueCapacity = renewThreadPoolQueueCapacity;
+    }
+
+    public long getLockTimeoutMsInHandleGroup() {
+        return lockTimeoutMsInHandleGroup;
+    }
+
+    public void setLockTimeoutMsInHandleGroup(long lockTimeoutMsInHandleGroup) {
+        this.lockTimeoutMsInHandleGroup = lockTimeoutMsInHandleGroup;
+    }
+
     public long getRenewAheadTimeMillis() {
         return renewAheadTimeMillis;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index 5f4c85af7..fa2241cb3 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -100,7 +100,7 @@ public class AckMessageActivity extends AbstractMessingActivity {
             String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
             MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
             if (messageReceiptHandle != null) {
-                handleString = messageReceiptHandle.getReceiptHandle();
+                handleString = messageReceiptHandle.getReceiptHandleStr();
             }
             CompletableFuture<AckResult> ackResultFuture = this.messagingProcessor.ackMessage(
                 ctx,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
index 367e5aa33..680364072 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
@@ -57,7 +57,7 @@ public class ChangeInvisibleDurationActivity extends AbstractMessingActivity {
 
             MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group, request.getMessageId(), receiptHandle.getReceiptHandle());
             if (messageReceiptHandle != null) {
-                receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandle());
+                receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
             }
             return this.messagingProcessor.changeInvisibleTime(
                 ctx,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 555a036ee..49763dcdb 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -123,7 +123,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity {
                             if (receiptHandle != null) {
                                 MessageReceiptHandle messageReceiptHandle =
                                     new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
-                                        messageExt.getQueueOffset(), messageExt.getReconsumeTimes(), proxyConfig.getRenewMaxTimeMillis());
+                                        messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
                                 receiptHandleProcessor.addReceiptHandle(ctx.getClientID(), group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
                             }
                         }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
index dd5d534e2..dec52f3c2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
@@ -50,7 +50,7 @@ public class ForwardMessageToDLQActivity extends AbstractMessingActivity {
             String handleString = request.getReceiptHandle();
             MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group, request.getMessageId(), request.getReceiptHandle());
             if (messageReceiptHandle != null) {
-                handleString = messageReceiptHandle.getReceiptHandle();
+                handleString = messageReceiptHandle.getReceiptHandleStr();
             }
             ReceiptHandle receiptHandle = ReceiptHandle.decode(handleString);
 
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index dc91061fd..9bb8b7f9f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -17,16 +17,18 @@
 
 package org.apache.rocketmq.proxy.processor;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Stopwatch;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
@@ -41,8 +43,11 @@ import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
 import org.apache.rocketmq.proxy.common.StartAndShutdown;
+import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.slf4j.Logger;
@@ -50,17 +55,45 @@ import org.slf4j.LoggerFactory;
 
 public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
     protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
-    protected final ConcurrentMap<String, ReceiptHandleGroup> receiptHandleGroupMap;
+    protected final ConcurrentMap<ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
     protected final ScheduledExecutorService scheduledExecutorService =
         Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
-    protected final ExecutorService renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
-        2, 4, 0, TimeUnit.MILLISECONDS,
-        "RenewalWorkerThread_", 10000
-    );
+    protected ThreadPoolExecutor renewalWorkerService;
     protected final MessagingProcessor messagingProcessor;
 
     public ReceiptHandleProcessor(MessagingProcessor messagingProcessor) {
         this.messagingProcessor = messagingProcessor;
+        this.receiptHandleGroupMap = new ConcurrentHashMap<>();
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+        this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
+            proxyConfig.getRenewThreadPoolNums(),
+            proxyConfig.getRenewMaxThreadPoolNums(),
+            1, TimeUnit.MINUTES,
+            "RenewalWorkerThread",
+            proxyConfig.getRenewThreadPoolQueueCapacity()
+        );
+        this.init();
+    }
+
+    protected void init() {
+        this.registerConsumerListener();
+        this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
+        this.appendStartAndShutdown(new StartAndShutdown() {
+            @Override
+            public void start() throws Exception {
+                scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0,
+                    ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
+            }
+
+            @Override
+            public void shutdown() throws Exception {
+                scheduledExecutorService.shutdown();
+                clearAllHandle();
+            }
+        });
+    }
+
+    protected void registerConsumerListener() {
         this.messagingProcessor.registerConsumerListener(new ConsumerIdsChangeListener() {
             @Override
             public void handle(ConsumerGroupEvent event, String group, Object... args) {
@@ -70,7 +103,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
                     }
                     if (args[0] instanceof ClientChannelInfo) {
                         ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
-                        clearGroup(buildKey(clientChannelInfo.getClientId(), group));
+                        clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getClientId(), group));
                     }
                 }
             }
@@ -80,25 +113,6 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
 
             }
         });
-        this.receiptHandleGroupMap = new ConcurrentHashMap<>();
-        this.init();
-    }
-
-    protected void init() {
-        this.appendStartAndShutdown(new StartAndShutdown() {
-            @Override
-            public void start() throws Exception {
-                log.info("scan for renewal start.");
-                scheduledExecutorService.scheduleAtFixedRate(() -> scheduleRenewTask(), 0,
-                    ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
-                log.info("renewal queue has started");
-            }
-
-            @Override
-            public void shutdown() throws Exception {
-                scheduledExecutorService.shutdown();
-            }
-        });
     }
 
     protected ProxyContext createContext(String actionName) {
@@ -106,90 +120,121 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
     }
 
     protected void scheduleRenewTask() {
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
-        for (Map.Entry<String, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
-            String key = entry.getKey();
-            Pair<String, String> clientIdAndGroup = parseKey(key);
-            if (clientIdAndGroup == null) {
-                log.warn("client id and group is empty. key:{}, receiptHandleGroup:{}", key, entry.getValue());
-                clearGroup(key);
-                continue;
-            }
-            if (clientIsOffline(clientIdAndGroup.getLeft(), clientIdAndGroup.getRight())) {
-                clearGroup(key);
-                continue;
-            }
-
-            ReceiptHandleGroup group = entry.getValue();
-            group.scan((msgID, handleStr, v) -> {
-                ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandle());
-                long now = System.currentTimeMillis();
-                if (handle.getNextVisibleTime() - now > proxyConfig.getRenewAheadTimeMillis()) {
-                    return;
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        try {
+            ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+            for (Map.Entry<ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
+                ReceiptHandleGroupKey key = entry.getKey();
+                if (clientIsOffline(key)) {
+                    clearGroup(key);
+                    continue;
                 }
-                SubscriptionGroupConfig subscriptionGroupConfig =
-                    messagingProcessor.getMetadataService().getSubscriptionGroupConfig(v.getGroup());
-                if (subscriptionGroupConfig == null) {
-                    log.error("Group's subscriptionGroupConfig is null, group = {}", v.getGroup());
-                    return;
-                }
-                RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
-                renewalWorkerService.submit(() -> renewMessage(key, msgID, v, handle, retryPolicy));
-            });
+
+                ReceiptHandleGroup group = entry.getValue();
+                group.scan((msgID, handleStr, v) -> {
+                    long current = System.currentTimeMillis();
+                    ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
+                    if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
+                        return;
+                    }
+                    renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr));
+                });
+            }
+        } catch (Exception e) {
+            log.error("unexpect error when schedule renew task", e);
         }
 
-        log.info("scan for renewal done.");
+        log.info("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
+    }
+
+    protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) {
+        try {
+            group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
+        } catch (Exception e) {
+            log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
+        }
     }
 
-    protected void renewMessage(String key, String msgID, MessageReceiptHandle messageReceiptHandle,
-        ReceiptHandle handle, RetryPolicy retryPolicy) {
+    protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
+        CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         ProxyContext context = createContext("RenewMessage");
+        ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
         long current = System.currentTimeMillis();
-        if (current - messageReceiptHandle.getTimestamp() < messageReceiptHandle.getExpectInvisibleTime()) {
-            CompletableFuture<AckResult> future =
-                messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
-                    messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
-            future.thenAccept(ackResult -> {
-                if (AckStatus.OK.equals(ackResult.getStatus())) {
-                    messageReceiptHandle.update(ackResult.getExtraInfo());
-                    addReceiptHandle(key, msgID, messageReceiptHandle.getOriginalReceiptHandle(), messageReceiptHandle);
-                }
-            });
-        } else {
-            CompletableFuture<AckResult> future = messagingProcessor.changeInvisibleTime(context,
-                handle, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(),
-                messageReceiptHandle.getTopic(), retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()));
-            future.thenAccept(ackResult -> {
-                if (AckStatus.OK.equals(ackResult.getStatus())) {
-                    removeReceiptHandle(key, msgID, messageReceiptHandle.getOriginalReceiptHandle());
+        try {
+            if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) {
+                log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
+                return CompletableFuture.completedFuture(null);
+            }
+            if (current - messageReceiptHandle.getTimestamp() < messageReceiptHandle.getExpectInvisibleTime()) {
+                CompletableFuture<AckResult> future =
+                    messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
+                        messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
+                future.whenComplete((ackResult, throwable) -> {
+                    if (throwable != null) {
+                        log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
+                        if (renewExceptionNeedRetry(throwable)) {
+                            messageReceiptHandle.incrementAndGetRenewRetryTimes();
+                            resFuture.complete(messageReceiptHandle);
+                        } else {
+                            resFuture.complete(null);
+                        }
+                    } else if (AckStatus.OK.equals(ackResult.getStatus())) {
+                        messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
+                        messageReceiptHandle.resetRenewRetryTimes();
+                        resFuture.complete(messageReceiptHandle);
+                    } else {
+                        log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle, throwable);
+                        resFuture.complete(null);
+                    }
+                });
+            } else {
+                SubscriptionGroupConfig subscriptionGroupConfig =
+                    messagingProcessor.getMetadataService().getSubscriptionGroupConfig(messageReceiptHandle.getGroup());
+                if (subscriptionGroupConfig == null) {
+                    log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle);
+                    return CompletableFuture.completedFuture(null);
                 }
-            });
+                RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
+                CompletableFuture<AckResult> future = messagingProcessor.changeInvisibleTime(context,
+                    handle, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(),
+                    messageReceiptHandle.getTopic(), retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()));
+                future.whenComplete((ackResult, throwable) -> {
+                    if (throwable != null) {
+                        log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
+                    }
+                    resFuture.complete(null);
+                });
+            }
+        } catch (Throwable t) {
+            log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t);
+            resFuture.complete(null);
         }
+        return resFuture;
     }
 
-    protected String buildKey(String clientID, String group) {
-        return clientID + "%" + group;
-    }
-
-    protected Pair<String, String> parseKey(String key) {
-        String[] strs = key.split("%");
-        if (strs.length < 2) {
-            return null;
+    protected boolean renewExceptionNeedRetry(Throwable t) {
+        t = ExceptionUtils.getRealException(t);
+        if (t instanceof ProxyException) {
+            ProxyException proxyException = (ProxyException) t;
+            if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
+                ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
+                return false;
+            }
         }
-        return Pair.of(strs[0], strs[1]);
+        return true;
     }
 
-    protected boolean clientIsOffline(String clientID, String group) {
-        return this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), group, clientID) == null;
+    protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
+        return this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), groupKey.group, groupKey.clientId) == null;
     }
 
     public void addReceiptHandle(String clientID, String group, String msgID, String receiptHandle,
         MessageReceiptHandle messageReceiptHandle) {
-        this.addReceiptHandle(buildKey(clientID, group), msgID, receiptHandle, messageReceiptHandle);
+        this.addReceiptHandle(new ReceiptHandleGroupKey(clientID, group), msgID, receiptHandle, messageReceiptHandle);
     }
 
-    protected void addReceiptHandle(String key, String msgID, String receiptHandle,
+    protected void addReceiptHandle(ReceiptHandleGroupKey key, String msgID, String receiptHandle,
         MessageReceiptHandle messageReceiptHandle) {
         if (key == null) {
             return;
@@ -199,44 +244,21 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
     }
 
     public MessageReceiptHandle removeReceiptHandle(String clientID, String group, String msgID, String receiptHandle) {
-        return this.removeReceiptHandle(buildKey(clientID, group), msgID, receiptHandle);
+        return this.removeReceiptHandle(new ReceiptHandleGroupKey(clientID, group), msgID, receiptHandle);
     }
 
-    protected MessageReceiptHandle removeReceiptHandle(String key, String msgID, String receiptHandle) {
+    protected MessageReceiptHandle removeReceiptHandle(ReceiptHandleGroupKey key, String msgID, String receiptHandle) {
         if (key == null) {
             return null;
         }
-        AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
-        receiptHandleGroupMap.computeIfPresent(key, (k, v) -> {
-            res.set(v.remove(msgID, receiptHandle));
-            if (v.isEmpty()) {
-                return null;
-            }
-            return v;
-        });
-        return res.get();
-    }
-
-    public MessageReceiptHandle removeOneReceiptHandle(String clientID, String group, String msgID) {
-        return removeOneReceiptHandle(buildKey(clientID, group), msgID);
-    }
-
-    protected MessageReceiptHandle removeOneReceiptHandle(String key, String msgID) {
-        if (key == null) {
+        ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(key);
+        if (handleGroup == null) {
             return null;
         }
-        AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
-        receiptHandleGroupMap.computeIfPresent(key, (k, v) -> {
-            res.set(v.removeOne(msgID));
-            if (v.isEmpty()) {
-                return null;
-            }
-            return v;
-        });
-        return res.get();
+        return handleGroup.remove(msgID, receiptHandle);
     }
 
-    protected void clearGroup(String key) {
+    protected void clearGroup(ReceiptHandleGroupKey key) {
         if (key == null) {
             return;
         }
@@ -246,16 +268,75 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
         if (handleGroup == null) {
             return;
         }
-        handleGroup.scan((msgID, handle, messageReceiptHandle) -> {
-            ReceiptHandle receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandle());
-            messagingProcessor.changeInvisibleTime(
-                context,
-                receiptHandle,
-                messageReceiptHandle.getMessageId(),
-                messageReceiptHandle.getGroup(),
-                messageReceiptHandle.getTopic(),
-                proxyConfig.getInvisibleTimeMillisWhenClear()
-            );
+        handleGroup.scan((msgID, handle, v) -> {
+            try {
+                handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
+                    ReceiptHandle receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
+                    messagingProcessor.changeInvisibleTime(
+                        context,
+                        receiptHandle,
+                        messageReceiptHandle.getMessageId(),
+                        messageReceiptHandle.getGroup(),
+                        messageReceiptHandle.getTopic(),
+                        proxyConfig.getInvisibleTimeMillisWhenClear()
+                    );
+                    return CompletableFuture.completedFuture(null);
+                });
+            } catch (Exception e) {
+                log.error("error when clear handle for group. key:{}", key, e);
+            }
         });
     }
+
+    protected void clearAllHandle() {
+        log.info("start clear all handle in receiptHandleProcessor");
+        Set<ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
+        for (ReceiptHandleGroupKey key : keySet) {
+            clearGroup(key);
+        }
+        log.info("clear all handle in receiptHandleProcessor done");
+    }
+
+    public static class ReceiptHandleGroupKey {
+        private final String clientId;
+        private final String group;
+
+        public ReceiptHandleGroupKey(String clientId, String group) {
+            this.clientId = clientId;
+            this.group = group;
+        }
+
+        public String getClientId() {
+            return clientId;
+        }
+
+        public String getGroup() {
+            return group;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
+            return Objects.equal(clientId, key.clientId) && Objects.equal(group, key.group);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(clientId, group);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("clientId", clientId)
+                .add("group", group)
+                .toString();
+        }
+    }
 }
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
new file mode 100644
index 000000000..6aca7ac6f
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.common;
+
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.consumer.ReceiptHandle;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.proxy.common.utils.FutureUtils;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ReceiptHandleGroupTest extends InitConfigAndLoggerTest {
+
+    private static final String TOPIC = "topic";
+    private static final String GROUP = "group";
+    private ReceiptHandleGroup receiptHandleGroup;
+    private String msgID;
+    private final Random random = new Random();
+
+    @Before
+    public void before() throws Throwable {
+        super.before();
+        receiptHandleGroup = new ReceiptHandleGroup();
+        msgID = MessageClientIDSetter.createUniqID();
+    }
+
+    protected String createHandle() {
+        return ReceiptHandle.builder()
+            .startOffset(0L)
+            .retrieveTime(System.currentTimeMillis())
+            .invisibleTime(3000)
+            .reviveQueueId(1)
+            .topicType(ReceiptHandle.NORMAL_TOPIC)
+            .brokerName("brokerName")
+            .queueId(random.nextInt(10))
+            .offset(random.nextInt(10))
+            .commitLogOffset(0L)
+            .build().encode();
+    }
+
+    @Test
+    public void testRemoveWhenComputeIfPresent() {
+        String handle1 = createHandle();
+        String handle2 = createHandle();
+        AtomicReference<MessageReceiptHandle> removeHandleRef = new AtomicReference<>();
+
+        receiptHandleGroup.put(msgID, handle1, createMessageReceiptHandle(handle1, msgID));
+        CountDownLatch latch = new CountDownLatch(2);
+        Thread removeThread = new Thread(() -> {
+            try {
+                latch.countDown();
+                latch.await();
+                removeHandleRef.set(receiptHandleGroup.remove(msgID, handle1));
+            } catch (Exception ignored) {
+            }
+        }, "removeThread");
+        Thread computeThread = new Thread(() -> {
+            try {
+                receiptHandleGroup.computeIfPresent(msgID, handle1, messageReceiptHandle -> {
+                    try {
+                        latch.countDown();
+                        latch.await();
+                    } catch (Exception ignored) {
+                    }
+                    messageReceiptHandle.updateReceiptHandle(handle2);
+                    return FutureUtils.addExecutor(CompletableFuture.completedFuture(messageReceiptHandle), Executors.newCachedThreadPool());
+                });
+            } catch (Exception ignored) {
+            }
+        }, "computeThread");
+        removeThread.start();
+        computeThread.start();
+
+        await().atMost(Duration.ofSeconds(1)).until(() -> removeHandleRef.get() != null);
+        assertEquals(handle2, removeHandleRef.get().getReceiptHandleStr());
+        assertTrue(receiptHandleGroup.isEmpty());
+    }
+
+    @Test
+    public void testRemoveWhenComputeIfPresentReturnNull() {
+        String handle1 = createHandle();
+        AtomicBoolean removeCalled = new AtomicBoolean(false);
+        AtomicReference<MessageReceiptHandle> removeHandleRef = new AtomicReference<>();
+
+        receiptHandleGroup.put(msgID, handle1, createMessageReceiptHandle(handle1, msgID));
+        CountDownLatch latch = new CountDownLatch(2);
+        Thread removeThread = new Thread(() -> {
+            try {
+                latch.countDown();
+                latch.await();
+                removeHandleRef.set(receiptHandleGroup.remove(msgID, handle1));
+                removeCalled.set(true);
+            } catch (Exception ignored) {
+            }
+        }, "removeThread");
+        Thread computeThread = new Thread(() -> {
+            try {
+                receiptHandleGroup.computeIfPresent(msgID, handle1, messageReceiptHandle -> {
+                    try {
+                        latch.countDown();
+                        latch.await();
+                    } catch (Exception ignored) {
+                    }
+                    return FutureUtils.addExecutor(CompletableFuture.completedFuture(null), Executors.newCachedThreadPool());
+                });
+            } catch (Exception ignored) {
+            }
+        }, "computeThread");
+        removeThread.start();
+        computeThread.start();
+
+        await().atMost(Duration.ofSeconds(1)).until(removeCalled::get);
+        assertNull(removeHandleRef.get());
+        assertTrue(receiptHandleGroup.isEmpty());
+    }
+
+    @Test
+    public void testRemoveMultiThread() {
+        String handle1 = createHandle();
+        AtomicReference<MessageReceiptHandle> removeHandleRef = new AtomicReference<>();
+        AtomicInteger count = new AtomicInteger();
+
+        receiptHandleGroup.put(msgID, handle1, createMessageReceiptHandle(handle1, msgID));
+        int threadNum = Math.max(Runtime.getRuntime().availableProcessors(), 3);
+        CountDownLatch latch = new CountDownLatch(threadNum);
+        for (int i = 0; i < threadNum; i++) {
+            Thread thread = new Thread(() -> {
+                try {
+                    latch.countDown();
+                    latch.await();
+                    MessageReceiptHandle handle = receiptHandleGroup.remove(msgID, handle1);
+                    if (handle != null) {
+                        removeHandleRef.set(handle);
+                        count.incrementAndGet();
+                    }
+                } catch (Exception ignored) {
+                }
+            });
+            thread.start();
+        }
+
+        await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> assertEquals(1, count.get()));
+        assertEquals(handle1, removeHandleRef.get().getReceiptHandleStr());
+        assertTrue(receiptHandleGroup.isEmpty());
+    }
+
+    private MessageReceiptHandle createMessageReceiptHandle(String handle, String msgID) {
+        return new MessageReceiptHandle(GROUP, TOPIC, 0, handle, msgID, 0, 0);
+    }
+}
\ No newline at end of file
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
index baa7a59ef..012649465 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
@@ -93,7 +93,7 @@ public class ChangeInvisibleDurationActivityTest extends BaseActivityTest {
             any(), receiptHandleCaptor.capture(), anyString(), anyString(), anyString(), invisibleTimeArgumentCaptor.capture()
         )).thenReturn(CompletableFuture.completedFuture(ackResult));
         when(receiptHandleProcessor.removeReceiptHandle(anyString(), anyString(), anyString(), anyString()))
-            .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0, 3000));
+            .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0));
 
         ChangeInvisibleDurationResponse response = this.changeInvisibleDurationActivity.changeInvisibleDuration(
             createContext(),
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
index f3a6229a7..cd3c48e1a 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
@@ -76,7 +76,7 @@ public class ForwardMessageToDLQActivityTest extends BaseActivityTest {
 
         String savedHandleStr = buildReceiptHandle("topic", System.currentTimeMillis(),3000);
         when(receiptHandleProcessor.removeReceiptHandle(anyString(), anyString(), anyString(), anyString()))
-            .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0, 3000));
+            .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0));
 
         ForwardMessageToDeadLetterQueueResponse response = this.forwardMessageToDLQActivity.forwardMessageToDeadLetterQueue(
             createContext(),
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 1f8633258..93ab0210c 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -17,21 +17,38 @@
 
 package org.apache.rocketmq.proxy.processor;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
 import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.proxy.common.ContextVariable;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ReceiptHandleProcessorTest extends BaseProcessorTest {
     private ReceiptHandleProcessor receiptHandleProcessor;
@@ -43,30 +60,32 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
     private static final int QUEUE_ID = 1;
     private static final String MESSAGE_ID = "messageId";
     private static final long OFFSET = 123L;
-    private static final long INVISIBLE_TIME = 100000L;
+    private static final long INVISIBLE_TIME = 60000L;
     private static final int RECONSUME_TIMES = 1;
     private static final String MSG_ID = MessageClientIDSetter.createUniqID();
     private MessageReceiptHandle messageReceiptHandle;
 
-    private final String receiptHandle = ReceiptHandle.builder()
-        .startOffset(0L)
-        .retrieveTime(0)
-        .invisibleTime(INVISIBLE_TIME)
-        .reviveQueueId(1)
-        .topicType(ReceiptHandle.NORMAL_TOPIC)
-        .brokerName(BROKER_NAME)
-        .queueId(QUEUE_ID)
-        .offset(OFFSET)
-        .commitLogOffset(0L)
-        .build().encode();
+    private String receiptHandle;
 
     @Before
     public void setup() {
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        receiptHandle = ReceiptHandle.builder()
+            .startOffset(0L)
+            .retrieveTime(System.currentTimeMillis() - INVISIBLE_TIME + config.getRenewAheadTimeMillis() - 5)
+            .invisibleTime(INVISIBLE_TIME)
+            .reviveQueueId(1)
+            .topicType(ReceiptHandle.NORMAL_TOPIC)
+            .brokerName(BROKER_NAME)
+            .queueId(QUEUE_ID)
+            .offset(OFFSET)
+            .commitLogOffset(0L)
+            .build().encode();
         PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id");
         receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor);
         Mockito.doNothing().when(messagingProcessor).registerConsumerListener(Mockito.any(ConsumerIdsChangeListener.class));
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
-            RECONSUME_TIMES, INVISIBLE_TIME);
+            RECONSUME_TIMES);
     }
 
     @Test
@@ -83,6 +102,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
 
     @Test
     public void testRenewReceiptHandle() {
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
         String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
         receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
@@ -91,7 +111,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
         long newInvisibleTime = 2000L;
         ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder()
             .startOffset(0L)
-            .retrieveTime(0)
+            .retrieveTime(System.currentTimeMillis() - newInvisibleTime + config.getRenewAheadTimeMillis() - 5)
             .invisibleTime(newInvisibleTime)
             .reviveQueueId(1)
             .topicType(ReceiptHandle.NORMAL_TOPIC)
@@ -117,6 +137,114 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
                 Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
     }
 
+    @Test
+    public void testRenewExceedMaxRenewTimes() {
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+
+        CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
+        ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
+        Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+            .thenReturn(ackResultFuture);
+
+        await().atMost(Duration.ofSeconds(1)).until(() -> {
+            receiptHandleProcessor.scheduleRenewTask();
+            try {
+                ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+                return receiptHandleGroup.isEmpty();
+            } catch (Exception e) {
+                return false;
+            }
+        });
+        Mockito.verify(messagingProcessor, Mockito.times(config.getMaxRenewRetryTimes()))
+            .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+    }
+
+    @Test
+    public void testRenewWithInvalidHandle() {
+        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+
+        CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
+        ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
+        Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis())))
+            .thenReturn(ackResultFuture);
+
+        await().atMost(Duration.ofSeconds(1)).until(() -> {
+            receiptHandleProcessor.scheduleRenewTask();
+            try {
+                ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+                return receiptHandleGroup.isEmpty();
+            } catch (Exception e) {
+                return false;
+            }
+        });
+    }
+
+    @Test
+    public void testRenewWithErrorThenOK() {
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+
+        AtomicInteger count = new AtomicInteger(0);
+        List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
+        {
+            CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
+            ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
+            futureList.add(ackResultFuture);
+            futureList.add(ackResultFuture);
+        }
+        {
+            long newInvisibleTime = 2000L;
+            ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder()
+                .startOffset(0L)
+                .retrieveTime(System.currentTimeMillis() - newInvisibleTime + config.getRenewAheadTimeMillis() - 5)
+                .invisibleTime(newInvisibleTime)
+                .reviveQueueId(1)
+                .topicType(ReceiptHandle.NORMAL_TOPIC)
+                .brokerName(BROKER_NAME)
+                .queueId(QUEUE_ID)
+                .offset(OFFSET)
+                .commitLogOffset(0L)
+                .build();
+            String newReceiptHandle = newReceiptHandleClass.encode();
+            AckResult ackResult = new AckResult();
+            ackResult.setStatus(AckStatus.OK);
+            ackResult.setExtraInfo(newReceiptHandle);
+            futureList.add(CompletableFuture.completedFuture(ackResult));
+        }
+        {
+            CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
+            ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
+            futureList.add(ackResultFuture);
+            futureList.add(ackResultFuture);
+            futureList.add(ackResultFuture);
+            futureList.add(ackResultFuture);
+        }
+        Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> {
+            return futureList.get(count.getAndIncrement());
+        }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getRenewSliceTimeMillis()));
+        await().atMost(Duration.ofSeconds(1)).until(() -> {
+            receiptHandleProcessor.scheduleRenewTask();
+            try {
+                ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+                return receiptHandleGroup.isEmpty();
+            } catch (Exception e) {
+                return false;
+            }
+        });
+        assertEquals(6, count.get());
+    }
+
     @Test
     public void testRenewReceiptHandleWhenTimeout() {
         long newInvisibleTime = 0L;
@@ -131,17 +259,59 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
             .offset(OFFSET)
             .commitLogOffset(0L)
             .build().encode();
-        messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
-            RECONSUME_TIMES, newInvisibleTime);
+        messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
+            RECONSUME_TIMES);
         String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
         receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle);
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+        Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()))
+            .thenReturn(CompletableFuture.completedFuture(new AckResult()));
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
                 Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(groupConfig.getGroupRetryPolicy().getRetryPolicy().nextDelayDuration(RECONSUME_TIMES)));
+
+        ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+        assertTrue(receiptHandleGroup.isEmpty());
+    }
+
+    @Test
+    public void testRenewReceiptHandleWhenTimeoutWithNoSubscription() {
+        long newInvisibleTime = 0L;
+        String newReceiptHandle = ReceiptHandle.builder()
+            .startOffset(0L)
+            .retrieveTime(0)
+            .invisibleTime(newInvisibleTime)
+            .reviveQueueId(1)
+            .topicType(ReceiptHandle.NORMAL_TOPIC)
+            .brokerName(BROKER_NAME)
+            .queueId(QUEUE_ID)
+            .offset(OFFSET)
+            .commitLogOffset(0L)
+            .build().encode();
+        messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
+            RECONSUME_TIMES);
+        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
+        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle);
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(null);
+        Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()))
+            .thenReturn(CompletableFuture.completedFuture(new AckResult()));
+        receiptHandleProcessor.scheduleRenewTask();
+        await().atMost(Duration.ofSeconds(1)).until(() -> {
+            try {
+                ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get();
+                return receiptHandleGroup.isEmpty();
+            } catch (Exception e) {
+                return false;
+            }
+        });
+
+        Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
+            .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(),
+                Mockito.anyString(), Mockito.anyString(), Mockito.anyLong());
     }
 
     @Test
@@ -158,7 +328,7 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
             .commitLogOffset(0L)
             .build().encode();
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
-            RECONSUME_TIMES, INVISIBLE_TIME);
+            RECONSUME_TIMES);
         String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
         receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, newReceiptHandle, messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
@@ -173,8 +343,8 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
     @Test
     public void testRemoveReceiptHandle() {
         String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        receiptHandleProcessor.addReceiptHandle(channelId, MSG_ID, receiptHandle, messageReceiptHandle);
-        receiptHandleProcessor.removeReceiptHandle(channelId, MSG_ID, receiptHandle);
+        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.removeReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
         receiptHandleProcessor.scheduleRenewTask();
@@ -186,8 +356,8 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
     @Test
     public void testClearGroup() {
         String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        receiptHandleProcessor.addReceiptHandle(channelId, MSG_ID, receiptHandle, messageReceiptHandle);
-        receiptHandleProcessor.clearGroup(channelId);
+        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channelId, GROUP));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
         receiptHandleProcessor.scheduleRenewTask();
@@ -195,4 +365,14 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest {
             .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
                 Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getInvisibleTimeMillisWhenClear()));
     }
+
+    @Test
+    public void testClientOffline() {
+        ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
+        Mockito.verify(messagingProcessor, Mockito.times(1)).registerConsumerListener(listenerArgumentCaptor.capture());
+        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
+        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, receiptHandle, messageReceiptHandle);
+        listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(null, channelId, LanguageCode.JAVA, 0));
+        assertTrue(receiptHandleProcessor.receiptHandleGroupMap.isEmpty());
+    }
 }
\ No newline at end of file