You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/10 04:04:28 UTC

[pulsar] 05/13: Fix OpBase.callback is not called in failPendingRequest (#14133)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 33448a74f0357befd8d11056a77757eb08fc3240
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Feb 7 18:28:41 2022 +0800

    Fix OpBase.callback is not called in failPendingRequest (#14133)
    
    (cherry picked from commit 49490dbd2439328628b62a0d7c9ebcb152c6c5ab)
---
 .../client/impl/TransactionMetaStoreHandler.java     | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 3c6286b..a30c235 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -123,7 +123,6 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
             if (getState() == State.Closing || getState() == State.Closed) {
                 setState(State.Closed);
                 failPendingRequest();
-                this.pendingRequests.clear();
                 return;
             }
 
@@ -173,17 +172,16 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
     }
 
     private void failPendingRequest() {
-        internalPinnedExecutor.execute(() -> {
-            pendingRequests.keys().forEach(k -> {
-                OpBase<?> op = pendingRequests.remove(k);
-                if (op != null && !op.callback.isDone()) {
-                    op.callback.completeExceptionally(new PulsarClientException.AlreadyClosedException(
-                            "Could not get response from transaction meta store when " +
-                                    "the transaction meta store has already close."));
-                    onResponse(op);
-                }
-            });
+        // this method is executed in internalPinnedExecutor.
+        pendingRequests.forEach((k, op) -> {
+            if (op != null && !op.callback.isDone()) {
+                op.callback.completeExceptionally(new PulsarClientException.AlreadyClosedException(
+                        "Could not get response from transaction meta store when "
+                                + "the transaction meta store has already close."));
+                onResponse(op);
+            }
         });
+        this.pendingRequests.clear();
     }
 
     public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit unit) {