You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2023/01/04 07:06:40 UTC

[rocketmq] branch develop updated: [ISSUE #5813] Fix topic queue lock not unlock in pop consumption mode (#5815)

This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 348f58a5b [ISSUE #5813] Fix topic queue lock not unlock in pop consumption mode (#5815)
348f58a5b is described below

commit 348f58a5b1e582386c7d734064771356d07185bd
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Wed Jan 4 15:06:23 2023 +0800

    [ISSUE #5813] Fix topic queue lock not unlock in pop consumption mode (#5815)
---
 .../rocketmq/broker/processor/PopMessageProcessor.java    | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 6fe7b6782..0f4de599a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -525,17 +525,21 @@ public class PopMessageProcessor implements NettyRequestProcessor {
         boolean isOrder = requestHeader.isOrder();
         long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
             false, lockKey, false);
+        CompletableFuture<Long> future = new CompletableFuture<>();
         if (!queueLockManager.tryLock(lockKey)) {
             restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
-            return CompletableFuture.completedFuture(restNum);
+            future.complete(restNum);
+            return future;
         }
 
         try {
+            future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey));
             offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
                 true, lockKey, true);
             if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic,
                 requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
-                return CompletableFuture.completedFuture(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum);
+                future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum);
+                return future;
             }
 
             if (isOrder) {
@@ -548,12 +552,13 @@ public class PopMessageProcessor implements NettyRequestProcessor {
 
             if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) {
                 restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
-                return CompletableFuture.completedFuture(restNum);
+                future.complete(restNum);
+                return future;
             }
         } catch (Exception e) {
             POP_LOGGER.error("Exception in popMsgFromQueue", e);
-            queueLockManager.unLock(lockKey);
-            return CompletableFuture.completedFuture(restNum);
+            future.complete(restNum);
+            return future;
         }
 
         AtomicLong atomicRestNum = new AtomicLong(restNum);