You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/12/02 18:29:50 UTC
[pulsar-client-cpp] branch main updated: [fix] Fix a crash when closing a connection while connecting (#136)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 7a7b3aa [fix] Fix a crash when closing a connection while connecting (#136)
7a7b3aa is described below
commit 7a7b3aa5faa802fa354d2b2e41d38ce22241d700
Author: erobot <er...@users.noreply.github.com>
AuthorDate: Sat Dec 3 02:29:44 2022 +0800
[fix] Fix a crash when closing a connection while connecting (#136)
---
lib/ClientConnection.cc | 20 +++++++++++++++++---
tests/ClientTest.cc | 15 +++++++++++++++
2 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 48b88b5..5c14467 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -279,23 +279,30 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
LOG_DEBUG("Current max message size is: " << maxMessageSize_);
}
+ Lock lock(mutex_);
+
+ if (isClosed()) {
+ LOG_INFO(cnxString_ << "Connection already closed");
+ return;
+ }
state_ = Ready;
connectTimeoutTask_->stop();
serverProtocolVersion_ = cmdConnected.protocol_version();
- connectPromise_.setValue(shared_from_this());
if (serverProtocolVersion_ >= proto::v1) {
// Only send keep-alive probes if the broker supports it
keepAliveTimer_ = executor_->createDeadlineTimer();
- Lock lock(mutex_);
if (keepAliveTimer_) {
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
keepAliveTimer_->async_wait(
std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
}
- lock.unlock();
}
+ lock.unlock();
+
+ connectPromise_.setValue(shared_from_this());
+
if (serverProtocolVersion_ >= proto::v8) {
startConsumerStatsTimer(std::vector<uint64_t>());
}
@@ -380,7 +387,14 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
} else {
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
}
+
+ Lock lock(mutex_);
+ if (isClosed()) {
+ LOG_INFO(cnxString_ << "Connection already closed");
+ return;
+ }
state_ = TcpConnected;
+ lock.unlock();
boost::system::error_code error;
socket_->set_option(tcp::no_delay(true), error);
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index bdb0555..a4abc3a 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include <chrono>
#include <future>
#include "HttpHelper.h"
@@ -293,3 +294,17 @@ TEST(ClientTest, testMultiBrokerUrl) {
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
client.close();
}
+
+TEST(ClientTest, testCloseClient) {
+ const std::string topic = "client-test-close-client-" + std::to_string(time(nullptr));
+
+ for (int i = 0; i < 1000; ++i) {
+ Client client(lookupUrl);
+ client.createProducerAsync(topic, [](Result result, Producer producer) { producer.close(); });
+ // simulate different time interval before close
+ auto t0 = std::chrono::steady_clock::now();
+ while ((std::chrono::steady_clock::now() - t0) < std::chrono::microseconds(i)) {
+ }
+ client.close();
+ }
+}