You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 08:04:49 UTC

[pulsar] branch branch-2.10 updated: Fix producerFuture not complete in ServerCnx#handleProducer (#14467)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new e56d2fd  Fix producerFuture not complete in ServerCnx#handleProducer (#14467)
e56d2fd is described below

commit e56d2fd7c47badfc920c9b54295eb047357b95b4
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Fri Feb 25 15:56:55 2022 +0800

    Fix producerFuture not complete in ServerCnx#handleProducer (#14467)
    
    (cherry picked from commit e937da92a5c6b79f44436720a81762b0ce6a8139)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java  | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f216902..445c618 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1286,8 +1286,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                             "Failed to create the initial subscription: " + ex.getCause().getMessage();
                                     log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
                                             remoteAddress, msg, initialSubscriptionName, topicName);
-                                    commandSender.sendErrorResponse(requestId,
-                                            BrokerServiceException.getClientErrorCode(ex), msg);
+                                    if (producerFuture.completeExceptionally(ex)) {
+                                        commandSender.sendErrorResponse(requestId,
+                                                BrokerServiceException.getClientErrorCode(ex), msg);
+                                    }
                                     producers.remove(producerId, producerFuture);
                                     return;
                                 }
@@ -1300,9 +1302,12 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             Throwable cause = exception.getCause();
                             log.error("producerId {}, requestId {} : TransactionBuffer recover failed",
                                     producerId, requestId, exception);
-                            commandSender.sendErrorResponse(requestId,
-                                    ServiceUnitNotReadyException.getClientErrorCode(cause),
-                                    cause.getMessage());
+                            if (producerFuture.completeExceptionally(exception)) {
+                                commandSender.sendErrorResponse(requestId,
+                                        ServiceUnitNotReadyException.getClientErrorCode(cause),
+                                        cause.getMessage());
+                            }
+                            producers.remove(producerId, producerFuture);
                             return null;
                         });
                     });