You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/12/12 09:10:30 UTC

[pulsar] branch branch-2.11 updated: [fix][txn] Fix PendingAckHandleImpl when `pendingAckStoreProvider.checkInitializedBefore` failed (#18859)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 0b5f31dbf34 [fix][txn] Fix PendingAckHandleImpl when `pendingAckStoreProvider.checkInitializedBefore` failed (#18859)
0b5f31dbf34 is described below

commit 0b5f31dbf34d9676004196ee872078cc01f88d33
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Mon Dec 12 16:54:28 2022 +0800

    [fix][txn] Fix PendingAckHandleImpl when `pendingAckStoreProvider.checkInitializedBefore` failed (#18859)
---
 .../pendingack/impl/PendingAckHandleImpl.java       | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index b3aec6c67f5..38d81d059b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -144,13 +144,20 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
 
         this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
-        pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
-            if (init) {
-                initPendingAckStore();
-            } else {
-                completeHandleFuture();
-            }
-        });
+
+        pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
+                .thenAccept(init -> {
+                    if (init) {
+                        initPendingAckStore();
+                    } else {
+                        completeHandleFuture();
+                    }
+                })
+                .exceptionally(t -> {
+                    changeToErrorState();
+                    exceptionHandleFuture(t);
+                    return null;
+                });
     }
 
     private void initPendingAckStore() {