You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/12/20 12:33:40 UTC
[pulsar] branch branch-2.9 updated: [fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 30f29eb7683 [fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
30f29eb7683 is described below
commit 30f29eb7683afa252dcaf82bfe42ab7e46facd58
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Tue Dec 20 11:57:19 2022 +0100
[fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
(cherry picked from commit 22866bd19c231e85ddff4acee4dad1f895cbbc72)
(cherry picked from commit 2d958a9248ee79e27d99cf736d01e0e0a519c245)
---
.../pendingack/impl/PendingAckHandleImpl.java | 18 +++++++-----------
1 file changed, 7 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 252e64dd55e..ce889860beb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -142,13 +142,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
- .thenAccept(init -> {
+ .thenAcceptAsync(init -> {
if (init) {
initPendingAckStore();
} else {
completeHandleFuture();
}
- })
+ }, internalPinnedExecutor)
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
changeToErrorState();
@@ -906,16 +906,12 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
return transactionPendingAckStats;
}
- public synchronized void completeHandleFuture() {
- if (!this.pendingAckHandleCompletableFuture.isDone()) {
- this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
- }
+ public void completeHandleFuture() {
+ this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
}
- public synchronized void exceptionHandleFuture(Throwable t) {
- if (!this.pendingAckHandleCompletableFuture.isDone()) {
- this.pendingAckHandleCompletableFuture.completeExceptionally(t);
- }
+ public void exceptionHandleFuture(Throwable t) {
+ this.pendingAckHandleCompletableFuture.completeExceptionally(t);
}
@Override
@@ -1000,4 +996,4 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
}
}
-}
\ No newline at end of file
+}