You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/12/02 18:29:50 UTC

[pulsar-client-cpp] branch main updated: [fix] Fix a crash when closing a connection while connecting (#136)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a7b3aa  [fix] Fix a crash when closing a connection while connecting (#136)
7a7b3aa is described below

commit 7a7b3aa5faa802fa354d2b2e41d38ce22241d700
Author: erobot <er...@users.noreply.github.com>
AuthorDate: Sat Dec 3 02:29:44 2022 +0800

    [fix] Fix a crash when closing a connection while connecting (#136)
---
 lib/ClientConnection.cc | 20 +++++++++++++++++---
 tests/ClientTest.cc     | 15 +++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 48b88b5..5c14467 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -279,23 +279,30 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
         LOG_DEBUG("Current max message size is: " << maxMessageSize_);
     }
 
+    Lock lock(mutex_);
+
+    if (isClosed()) {
+        LOG_INFO(cnxString_ << "Connection already closed");
+        return;
+    }
     state_ = Ready;
     connectTimeoutTask_->stop();
     serverProtocolVersion_ = cmdConnected.protocol_version();
-    connectPromise_.setValue(shared_from_this());
 
     if (serverProtocolVersion_ >= proto::v1) {
         // Only send keep-alive probes if the broker supports it
         keepAliveTimer_ = executor_->createDeadlineTimer();
-        Lock lock(mutex_);
         if (keepAliveTimer_) {
             keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
             keepAliveTimer_->async_wait(
                 std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
         }
-        lock.unlock();
     }
 
+    lock.unlock();
+
+    connectPromise_.setValue(shared_from_this());
+
     if (serverProtocolVersion_ >= proto::v8) {
         startConsumerStatsTimer(std::vector<uint64_t>());
     }
@@ -380,7 +387,14 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
         } else {
             LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
         }
+
+        Lock lock(mutex_);
+        if (isClosed()) {
+            LOG_INFO(cnxString_ << "Connection already closed");
+            return;
+        }
         state_ = TcpConnected;
+        lock.unlock();
 
         boost::system::error_code error;
         socket_->set_option(tcp::no_delay(true), error);
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index bdb0555..a4abc3a 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -19,6 +19,7 @@
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 
+#include <chrono>
 #include <future>
 
 #include "HttpHelper.h"
@@ -293,3 +294,17 @@ TEST(ClientTest, testMultiBrokerUrl) {
     ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
     client.close();
 }
+
+TEST(ClientTest, testCloseClient) {
+    const std::string topic = "client-test-close-client-" + std::to_string(time(nullptr));
+
+    for (int i = 0; i < 1000; ++i) {
+        Client client(lookupUrl);
+        client.createProducerAsync(topic, [](Result result, Producer producer) { producer.close(); });
+        // simulate different time interval before close
+        auto t0 = std::chrono::steady_clock::now();
+        while ((std::chrono::steady_clock::now() - t0) < std::chrono::microseconds(i)) {
+        }
+        client.close();
+    }
+}