You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2021/09/02 14:07:57 UTC

[pulsar] branch master updated: Fix incorrect connect timeout (#11889)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cdd0f41  Fix incorrect connect timeout (#11889)
cdd0f41 is described below

commit cdd0f41f90ee0a8a03201adf759af056b065a547
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Sep 2 22:07:11 2021 +0800

    Fix incorrect connect timeout (#11889)
    
    Fixes #10721
    
    ### Motivation
    
    Currently the connection timer stops after the TCP connection is established immediately (the state becomes `TcpConnected`). However, the connect phase should also include sending the `CommandConnect` request and receiving the `CommandConnected` response from broker successfully. For example, in a case like #10721 described, if the Pulsar broker received a SIGSTOP signal and became pending, the TCP connection can be established but no response could be received from a stopped broker.  [...]
    
    ### Modifications
    
    Stop the connection timer only after the `ClientConnection`'s state becomes `Ready`, which means the client has received the `CommandConnected` response successfully. It's also consistent with Java client's implementation, see https://github.com/apache/pulsar/blob/235e678a56d0284e68b45e46706b6237d7c6d5f9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L329-L330
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    It's hard to simulate the scenario in unit test. But we can simply reproduce it in local env.
    1. Run a pulsar standalone and send `SIGSTOP` signal by `pkill -SIGSTOP -f pulsar`.
    2. Run any C++/Python client with this fix to connect to the standalone.
    
    ```
    >>> import pulsar
    >>> c = pulsar.Client('pulsar://localhost:6650')
    >>> c.create_producer('xxx')
    2021-09-02 11:54:38.828 INFO  [0x10d72ae00] ClientConnection:181 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
    2021-09-02 11:54:38.828 INFO  [0x10d72ae00] ConnectionPool:96 | Created connection for pulsar://localhost:6650
    2021-09-02 11:54:38.969 INFO  [0x700003136000] ClientConnection:367 | [127.0.0.1:62784 -> 127.0.0.1:6650] Connected to broker
    2021-09-02 11:54:48.973 ERROR [0x700003136000] ClientConnection:532 | [127.0.0.1:62784 -> 127.0.0.1:6650] Connection was not established in 10000 ms, close the socket
    2021-09-02 11:54:48.973 ERROR [0x700003136000] ClientConnection:572 | [127.0.0.1:62784 -> 127.0.0.1:6650] Read failed: Operation canceled
    2021-09-02 11:54:48.973 INFO  [0x700003136000] ClientConnection:1495 | [127.0.0.1:62784 -> 127.0.0.1:6650] Connection closed
    2021-09-02 11:54:48.973 ERROR [0x700003136000] ClientImpl:188 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/xxx -- ConnectError
    Traceback (most recent call last):
        # ...
    _pulsar.ConnectError: Pulsar error: ConnectError
    ```
    
    we can see after 10 seconds (the default connect timeout), it failed.
---
 pulsar-client-cpp/lib/ClientConnection.cc | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index fc40981..fc3582d 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -268,6 +268,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
     }
 
     state_ = Ready;
+    connectTimeoutTask_->stop();
     serverProtocolVersion_ = cmdConnected.protocol_version();
     connectPromise_.setValue(shared_from_this());
 
@@ -368,7 +369,6 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
             LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
         }
         state_ = TcpConnected;
-        connectTimeoutTask_->stop();
 
         boost::system::error_code error;
         socket_->set_option(tcp::no_delay(true), error);
@@ -527,7 +527,7 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
 
     auto self = shared_from_this();
     connectTimeoutTask_->setCallback([this, self](const PeriodicTask::ErrorCode& ec) {
-        if (state_ != TcpConnected) {
+        if (state_ != Ready) {
             LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs()
                                  << " ms, close the socket");
             PeriodicTask::ErrorCode err;