You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/16 09:51:12 UTC
[pulsar] branch branch-2.9 updated: [fix][txn] Fix PendingAckHandleImpl when `pendingAckStoreProvider.checkInitializedBefore` failed (#18859)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 451b0d7f1d6 [fix][txn] Fix PendingAckHandleImpl when `pendingAckStoreProvider.checkInitializedBefore` failed (#18859)
451b0d7f1d6 is described below
commit 451b0d7f1d684022b195dc090b95025a09f8187f
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Mon Dec 12 16:54:28 2022 +0800
[fix][txn] Fix PendingAckHandleImpl when `pendingAckStoreProvider.checkInitializedBefore` failed (#18859)
(cherry picked from commit 1be5a69a079594d8d96d2a0ab7ab8b389da8865e)
---
.../pendingack/impl/PendingAckHandleImpl.java | 21 ++++++++++++++-------
1 file changed, 14 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 41ef25b3e4d..a5b2ec90426 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -140,13 +140,20 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
- pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
- if (init) {
- initPendingAckStore();
- } else {
- completeHandleFuture();
- }
- });
+
+ pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
+ .thenAccept(init -> {
+ if (init) {
+ initPendingAckStore();
+ } else {
+ completeHandleFuture();
+ }
+ })
+ .exceptionally(t -> {
+ changeToErrorState();
+ exceptionHandleFuture(t);
+ return null;
+ });
}
private void initPendingAckStore() {