You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 08:04:49 UTC
[pulsar] branch branch-2.10 updated: Fix producerFuture not complete in ServerCnx#handleProducer (#14467)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new e56d2fd Fix producerFuture not complete in ServerCnx#handleProducer (#14467)
e56d2fd is described below
commit e56d2fd7c47badfc920c9b54295eb047357b95b4
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Fri Feb 25 15:56:55 2022 +0800
Fix producerFuture not complete in ServerCnx#handleProducer (#14467)
(cherry picked from commit e937da92a5c6b79f44436720a81762b0ce6a8139)
---
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f216902..445c618 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1286,8 +1286,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
"Failed to create the initial subscription: " + ex.getCause().getMessage();
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
remoteAddress, msg, initialSubscriptionName, topicName);
- commandSender.sendErrorResponse(requestId,
- BrokerServiceException.getClientErrorCode(ex), msg);
+ if (producerFuture.completeExceptionally(ex)) {
+ commandSender.sendErrorResponse(requestId,
+ BrokerServiceException.getClientErrorCode(ex), msg);
+ }
producers.remove(producerId, producerFuture);
return;
}
@@ -1300,9 +1302,12 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
Throwable cause = exception.getCause();
log.error("producerId {}, requestId {} : TransactionBuffer recover failed",
producerId, requestId, exception);
- commandSender.sendErrorResponse(requestId,
- ServiceUnitNotReadyException.getClientErrorCode(cause),
- cause.getMessage());
+ if (producerFuture.completeExceptionally(exception)) {
+ commandSender.sendErrorResponse(requestId,
+ ServiceUnitNotReadyException.getClientErrorCode(cause),
+ cause.getMessage());
+ }
+ producers.remove(producerId, producerFuture);
return null;
});
});