You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/12/20 12:33:40 UTC

[pulsar] branch branch-2.9 updated: [fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 30f29eb7683 [fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
30f29eb7683 is described below

commit 30f29eb7683afa252dcaf82bfe42ab7e46facd58
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Tue Dec 20 11:57:19 2022 +0100

    [fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
    
    (cherry picked from commit 22866bd19c231e85ddff4acee4dad1f895cbbc72)
    (cherry picked from commit 2d958a9248ee79e27d99cf736d01e0e0a519c245)
---
 .../pendingack/impl/PendingAckHandleImpl.java          | 18 +++++++-----------
 1 file changed, 7 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 252e64dd55e..ce889860beb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -142,13 +142,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
 
         pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
-                .thenAccept(init -> {
+                .thenAcceptAsync(init -> {
                     if (init) {
                         initPendingAckStore();
                     } else {
                         completeHandleFuture();
                     }
-                })
+                }, internalPinnedExecutor)
                 .exceptionally(e -> {
                     Throwable t = FutureUtil.unwrapCompletionException(e);
                     changeToErrorState();
@@ -906,16 +906,12 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
-        }
+    public void completeHandleFuture() {
+        this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
     }
 
-    public synchronized void exceptionHandleFuture(Throwable t) {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            this.pendingAckHandleCompletableFuture.completeExceptionally(t);
-        }
+    public void exceptionHandleFuture(Throwable t) {
+        this.pendingAckHandleCompletableFuture.completeExceptionally(t);
     }
 
     @Override
@@ -1000,4 +996,4 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
             }
         }
     }
-}
\ No newline at end of file
+}