You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/10 04:04:28 UTC
[pulsar] 05/13: Fix OpBase.callback is not called in failPendingRequest (#14133)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 33448a74f0357befd8d11056a77757eb08fc3240
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Feb 7 18:28:41 2022 +0800
Fix OpBase.callback is not called in failPendingRequest (#14133)
(cherry picked from commit 49490dbd2439328628b62a0d7c9ebcb152c6c5ab)
---
.../client/impl/TransactionMetaStoreHandler.java | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 3c6286b..a30c235 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -123,7 +123,6 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
if (getState() == State.Closing || getState() == State.Closed) {
setState(State.Closed);
failPendingRequest();
- this.pendingRequests.clear();
return;
}
@@ -173,17 +172,16 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
}
private void failPendingRequest() {
- internalPinnedExecutor.execute(() -> {
- pendingRequests.keys().forEach(k -> {
- OpBase<?> op = pendingRequests.remove(k);
- if (op != null && !op.callback.isDone()) {
- op.callback.completeExceptionally(new PulsarClientException.AlreadyClosedException(
- "Could not get response from transaction meta store when " +
- "the transaction meta store has already close."));
- onResponse(op);
- }
- });
+ // this method is executed in internalPinnedExecutor.
+ pendingRequests.forEach((k, op) -> {
+ if (op != null && !op.callback.isDone()) {
+ op.callback.completeExceptionally(new PulsarClientException.AlreadyClosedException(
+ "Could not get response from transaction meta store when "
+ + "the transaction meta store has already close."));
+ onResponse(op);
+ }
});
+ this.pendingRequests.clear();
}
public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit unit) {