You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/12/12 09:10:30 UTC
[pulsar] branch branch-2.11 updated: [fix][txn] Fix PendingAckHandleImpl when `pendingAckStoreProvider.checkInitializedBefore` failed (#18859)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 0b5f31dbf34 [fix][txn] Fix PendingAckHandleImpl when `pendingAckStoreProvider.checkInitializedBefore` failed (#18859)
0b5f31dbf34 is described below
commit 0b5f31dbf34d9676004196ee872078cc01f88d33
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Mon Dec 12 16:54:28 2022 +0800
[fix][txn] Fix PendingAckHandleImpl when `pendingAckStoreProvider.checkInitializedBefore` failed (#18859)
---
.../pendingack/impl/PendingAckHandleImpl.java | 21 ++++++++++++++-------
1 file changed, 14 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index b3aec6c67f5..38d81d059b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -144,13 +144,20 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
- pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
- if (init) {
- initPendingAckStore();
- } else {
- completeHandleFuture();
- }
- });
+
+ pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
+ .thenAccept(init -> {
+ if (init) {
+ initPendingAckStore();
+ } else {
+ completeHandleFuture();
+ }
+ })
+ .exceptionally(t -> {
+ changeToErrorState();
+ exceptionHandleFuture(t);
+ return null;
+ });
}
private void initPendingAckStore() {