You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:57:04 UTC
[pulsar] 14/22: Future completed twice in the method of impl.MLPendingAckStore#closeAsync (#12362)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 86681ae42cc1e43eef8ede9aa6c33e4fd22fed00
Author: chenlin <15...@qq.com>
AuthorDate: Fri Oct 22 16:23:42 2021 +0800
Future completed twice in the method of impl.MLPendingAckStore#closeAsync (#12362)
(cherry picked from commit 10371ee870d26a079d7d744154e7c4f67ac09fae)
---
.../pendingack/impl/MLPendingAckStore.java | 23 ++++++++++++++++------
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index b60f4ba..fb88878 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -120,12 +120,23 @@ public class MLPendingAckStore implements PendingAckStore {
cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
@Override
public void closeComplete(Object ctx) {
- try {
- managedLedger.close();
- } catch (Exception e) {
- completableFuture.completeExceptionally(e);
- }
- completableFuture.complete(null);
+ managedLedger.asyncClose(new AsyncCallbacks.CloseCallback() {
+
+ @Override
+ public void closeComplete(Object ctx) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] MLPendingAckStore closed successfully!", managedLedger.getName(), ctx);
+ }
+ completableFuture.complete(null);
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("[{}][{}] MLPendingAckStore closed failed,exception={}", managedLedger.getName(),
+ ctx, exception);
+ completableFuture.completeExceptionally(exception);
+ }
+ }, ctx);
}
@Override