You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/12/20 12:33:17 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 2d958a9248e [fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
2d958a9248e is described below
commit 2d958a9248ee79e27d99cf736d01e0e0a519c245
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Tue Dec 20 11:57:19 2022 +0100
[fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
(cherry picked from commit 22866bd19c231e85ddff4acee4dad1f895cbbc72)
---
.../pendingack/impl/PendingAckHandleImpl.java | 18 +++++++-----------
1 file changed, 7 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 6bdc1ee03a4..8d143eb1ddb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -143,13 +143,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
- .thenAccept(init -> {
+ .thenAcceptAsync(init -> {
if (init) {
initPendingAckStore();
} else {
completeHandleFuture();
}
- })
+ }, internalPinnedExecutor)
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
changeToErrorState();
@@ -908,16 +908,12 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
return transactionPendingAckStats;
}
- public synchronized void completeHandleFuture() {
- if (!this.pendingAckHandleCompletableFuture.isDone()) {
- this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
- }
+ public void completeHandleFuture() {
+ this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
}
- public synchronized void exceptionHandleFuture(Throwable t) {
- if (!this.pendingAckHandleCompletableFuture.isDone()) {
- this.pendingAckHandleCompletableFuture.completeExceptionally(t);
- }
+ public void exceptionHandleFuture(Throwable t) {
+ this.pendingAckHandleCompletableFuture.completeExceptionally(t);
}
@Override
@@ -1002,4 +998,4 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
}
}
-}
\ No newline at end of file
+}