You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2023/01/04 07:06:40 UTC
[rocketmq] branch develop updated: [ISSUE #5813] Fix topic queue lock not unlock in pop consumption mode (#5815)
This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 348f58a5b [ISSUE #5813] Fix topic queue lock not unlock in pop consumption mode (#5815)
348f58a5b is described below
commit 348f58a5b1e582386c7d734064771356d07185bd
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Wed Jan 4 15:06:23 2023 +0800
[ISSUE #5813] Fix topic queue lock not unlock in pop consumption mode (#5815)
---
.../rocketmq/broker/processor/PopMessageProcessor.java | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 6fe7b6782..0f4de599a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -525,17 +525,21 @@ public class PopMessageProcessor implements NettyRequestProcessor {
boolean isOrder = requestHeader.isOrder();
long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
false, lockKey, false);
+ CompletableFuture<Long> future = new CompletableFuture<>();
if (!queueLockManager.tryLock(lockKey)) {
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
- return CompletableFuture.completedFuture(restNum);
+ future.complete(restNum);
+ return future;
}
try {
+ future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey));
offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
true, lockKey, true);
if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic,
requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
- return CompletableFuture.completedFuture(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum);
+ future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum);
+ return future;
}
if (isOrder) {
@@ -548,12 +552,13 @@ public class PopMessageProcessor implements NettyRequestProcessor {
if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) {
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
- return CompletableFuture.completedFuture(restNum);
+ future.complete(restNum);
+ return future;
}
} catch (Exception e) {
POP_LOGGER.error("Exception in popMsgFromQueue", e);
- queueLockManager.unLock(lockKey);
- return CompletableFuture.completedFuture(restNum);
+ future.complete(restNum);
+ return future;
}
AtomicLong atomicRestNum = new AtomicLong(restNum);