You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2021/09/02 14:07:57 UTC
[pulsar] branch master updated: Fix incorrect connect timeout
(#11889)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cdd0f41 Fix incorrect connect timeout (#11889)
cdd0f41 is described below
commit cdd0f41f90ee0a8a03201adf759af056b065a547
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Sep 2 22:07:11 2021 +0800
Fix incorrect connect timeout (#11889)
Fixes #10721
### Motivation
Currently the connection timer stops after the TCP connection is established immediately (the state becomes `TcpConnected`). However, the connect phase should also include sending the `CommandConnect` request and receiving the `CommandConnected` response from broker successfully. For example, in a case like #10721 described, if the Pulsar broker received a SIGSTOP signal and became pending, the TCP connection can be established but no response could be received from a stopped broker. [...]
### Modifications
Stop the connection timer only after the `ClientConnection`'s state becomes `Ready`, which means the client has received the `CommandConnected` response successfully. It's also consistent with Java client's implementation, see https://github.com/apache/pulsar/blob/235e678a56d0284e68b45e46706b6237d7c6d5f9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L329-L330
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
It's hard to simulate the scenario in unit test. But we can simply reproduce it in local env.
1. Run a pulsar standalone and send `SIGSTOP` signal by `pkill -SIGSTOP -f pulsar`.
2. Run any C++/Python client with this fix to connect to the standalone.
```
>>> import pulsar
>>> c = pulsar.Client('pulsar://localhost:6650')
>>> c.create_producer('xxx')
2021-09-02 11:54:38.828 INFO [0x10d72ae00] ClientConnection:181 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2021-09-02 11:54:38.828 INFO [0x10d72ae00] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2021-09-02 11:54:38.969 INFO [0x700003136000] ClientConnection:367 | [127.0.0.1:62784 -> 127.0.0.1:6650] Connected to broker
2021-09-02 11:54:48.973 ERROR [0x700003136000] ClientConnection:532 | [127.0.0.1:62784 -> 127.0.0.1:6650] Connection was not established in 10000 ms, close the socket
2021-09-02 11:54:48.973 ERROR [0x700003136000] ClientConnection:572 | [127.0.0.1:62784 -> 127.0.0.1:6650] Read failed: Operation canceled
2021-09-02 11:54:48.973 INFO [0x700003136000] ClientConnection:1495 | [127.0.0.1:62784 -> 127.0.0.1:6650] Connection closed
2021-09-02 11:54:48.973 ERROR [0x700003136000] ClientImpl:188 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/xxx -- ConnectError
Traceback (most recent call last):
# ...
_pulsar.ConnectError: Pulsar error: ConnectError
```
we can see after 10 seconds (the default connect timeout), it failed.
---
pulsar-client-cpp/lib/ClientConnection.cc | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index fc40981..fc3582d 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -268,6 +268,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
}
state_ = Ready;
+ connectTimeoutTask_->stop();
serverProtocolVersion_ = cmdConnected.protocol_version();
connectPromise_.setValue(shared_from_this());
@@ -368,7 +369,6 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
}
state_ = TcpConnected;
- connectTimeoutTask_->stop();
boost::system::error_code error;
socket_->set_option(tcp::no_delay(true), error);
@@ -527,7 +527,7 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
auto self = shared_from_this();
connectTimeoutTask_->setCallback([this, self](const PeriodicTask::ErrorCode& ec) {
- if (state_ != TcpConnected) {
+ if (state_ != Ready) {
LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs()
<< " ms, close the socket");
PeriodicTask::ErrorCode err;