You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:57:04 UTC

[pulsar] 14/22: Future completed twice in the method of impl.MLPendingAckStore#closeAsync (#12362)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 86681ae42cc1e43eef8ede9aa6c33e4fd22fed00
Author: chenlin <15...@qq.com>
AuthorDate: Fri Oct 22 16:23:42 2021 +0800

    Future completed twice in the method of  impl.MLPendingAckStore#closeAsync (#12362)
    
    (cherry picked from commit 10371ee870d26a079d7d744154e7c4f67ac09fae)
---
 .../pendingack/impl/MLPendingAckStore.java         | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index b60f4ba..fb88878 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -120,12 +120,23 @@ public class MLPendingAckStore implements PendingAckStore {
         cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
             @Override
             public void closeComplete(Object ctx) {
-                try {
-                    managedLedger.close();
-                } catch (Exception e) {
-                    completableFuture.completeExceptionally(e);
-                }
-                completableFuture.complete(null);
+                managedLedger.asyncClose(new AsyncCallbacks.CloseCallback() {
+
+                    @Override
+                    public void closeComplete(Object ctx) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}][{}] MLPendingAckStore closed successfully!", managedLedger.getName(), ctx);
+                        }
+                        completableFuture.complete(null);
+                    }
+
+                    @Override
+                    public void closeFailed(ManagedLedgerException exception, Object ctx) {
+                        log.error("[{}][{}] MLPendingAckStore closed failed,exception={}", managedLedger.getName(),
+                                ctx, exception);
+                        completableFuture.completeExceptionally(exception);
+                    }
+                }, ctx);
             }
 
             @Override