You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/12/02 18:33:30 UTC

[pulsar-client-cpp] branch branch-3.1 updated (f9cca17 -> 9a4b0d1)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


    from f9cca17  [fix] Fix acknowledge MessageId list does not work when ackGroupingTimeMs is 0 (#128)
     new aa6d52c  [fix] Close broker producer created after producer close (#131)
     new 9a4b0d1  [fix] Fix a crash when closing a connection while connecting (#136)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 lib/ClientConnection.cc | 20 +++++++++++++++++---
 lib/ProducerImpl.cc     | 35 +++++++++++++++++++++++++++--------
 tests/ClientTest.cc     | 15 +++++++++++++++
 tests/ProducerTest.cc   | 43 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 102 insertions(+), 11 deletions(-)


[pulsar-client-cpp] 02/02: [fix] Fix a crash when closing a connection while connecting (#136)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 9a4b0d17d618c540fa6773484d540150369881ad
Author: erobot <er...@users.noreply.github.com>
AuthorDate: Sat Dec 3 02:29:44 2022 +0800

    [fix] Fix a crash when closing a connection while connecting (#136)
---
 lib/ClientConnection.cc | 20 +++++++++++++++++---
 tests/ClientTest.cc     | 15 +++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 48b88b5..5c14467 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -279,23 +279,30 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
         LOG_DEBUG("Current max message size is: " << maxMessageSize_);
     }
 
+    Lock lock(mutex_);
+
+    if (isClosed()) {
+        LOG_INFO(cnxString_ << "Connection already closed");
+        return;
+    }
     state_ = Ready;
     connectTimeoutTask_->stop();
     serverProtocolVersion_ = cmdConnected.protocol_version();
-    connectPromise_.setValue(shared_from_this());
 
     if (serverProtocolVersion_ >= proto::v1) {
         // Only send keep-alive probes if the broker supports it
         keepAliveTimer_ = executor_->createDeadlineTimer();
-        Lock lock(mutex_);
         if (keepAliveTimer_) {
             keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
             keepAliveTimer_->async_wait(
                 std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
         }
-        lock.unlock();
     }
 
+    lock.unlock();
+
+    connectPromise_.setValue(shared_from_this());
+
     if (serverProtocolVersion_ >= proto::v8) {
         startConsumerStatsTimer(std::vector<uint64_t>());
     }
@@ -380,7 +387,14 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
         } else {
             LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
         }
+
+        Lock lock(mutex_);
+        if (isClosed()) {
+            LOG_INFO(cnxString_ << "Connection already closed");
+            return;
+        }
         state_ = TcpConnected;
+        lock.unlock();
 
         boost::system::error_code error;
         socket_->set_option(tcp::no_delay(true), error);
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index bdb0555..a4abc3a 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -19,6 +19,7 @@
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 
+#include <chrono>
 #include <future>
 
 #include "HttpHelper.h"
@@ -293,3 +294,17 @@ TEST(ClientTest, testMultiBrokerUrl) {
     ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
     client.close();
 }
+
+TEST(ClientTest, testCloseClient) {
+    const std::string topic = "client-test-close-client-" + std::to_string(time(nullptr));
+
+    for (int i = 0; i < 1000; ++i) {
+        Client client(lookupUrl);
+        client.createProducerAsync(topic, [](Result result, Producer producer) { producer.close(); });
+        // simulate different time interval before close
+        auto t0 = std::chrono::steady_clock::now();
+        while ((std::chrono::steady_clock::now() - t0) < std::chrono::microseconds(i)) {
+        }
+        client.close();
+    }
+}


[pulsar-client-cpp] 01/02: [fix] Close broker producer created after producer close (#131)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit aa6d52c99949f6c9c7ebbf38fecb1a268a06b424
Author: erobot <er...@users.noreply.github.com>
AuthorDate: Thu Dec 1 18:12:40 2022 +0800

    [fix] Close broker producer created after producer close (#131)
    
    ### Motivation
    
    Close broker producer created after producer close to prevent producers from keeping alive in broker, and fix a race condition here.
    
    Race condition sequence:
    1. prodcuer.start() is called
    2. handleCreateProducer() is called, and runs to right before setCnx()
    3. closeAsync() is called and returns directly after getCnx() returns nullptr, so no CloseProducer cmd is sent
    4. handleCreateProducer() continues
    5. Result: Producer is closed and no CloseProducer cmd is sent to broker
    
    ### Modifications
    
    * Close broker producer created after producer close
    * Use mutex to prevent race condition. handleCreateProducer() and closeAsync() should be low frequency operations and using mutex here should not affect performance.
---
 lib/ProducerImpl.cc   | 35 +++++++++++++++++++++++++++--------
 tests/ProducerTest.cc | 43 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 70 insertions(+), 8 deletions(-)

diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 7fa3ff2..a3a5a95 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -172,6 +172,8 @@ void ProducerImpl::connectionFailed(Result result) {
 
 void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
                                         const ResponseData& responseData) {
+    Lock lock(mutex_);
+
     LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));
 
     // make sure we're still in the Pending/Ready state, closeAsync could have been invoked
@@ -180,11 +182,21 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
     if (state != Ready && state != Pending) {
         LOG_DEBUG("Producer created response received but producer already closed");
         failPendingMessages(ResultAlreadyClosed, false);
+        if (result == ResultOk || result == ResultTimeout) {
+            auto client = client_.lock();
+            if (client) {
+                int requestId = client->newRequestId();
+                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
+            }
+        }
+        if (!producerCreatedPromise_.isComplete()) {
+            lock.unlock();
+            producerCreatedPromise_.setFailed(ResultAlreadyClosed);
+        }
         return;
     }
 
     if (result == ResultOk) {
-        Lock lock(mutex_);
         // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
         // set the cnx pointer so that new messages will be sent immediately
         LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
@@ -203,7 +215,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         setCnx(cnx);
         state_ = Ready;
         backoff_.reset();
-        lock.unlock();
 
         if (conf_.isEncryptionEnabled()) {
             auto weakSelf = weak_from_this();
@@ -226,6 +237,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
             startSendTimeoutTimer();
         }
 
+        lock.unlock();
         producerCreatedPromise_.setValue(shared_from_this());
 
     } else {
@@ -234,22 +246,26 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
             // Creating the producer has timed out. We need to ensure the broker closes the producer
             // in case it was indeed created, otherwise it might prevent new create producer operation,
             // since we are not closing the connection
-            int requestId = client_.lock()->newRequestId();
-            cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
+            auto client = client_.lock();
+            if (client) {
+                int requestId = client->newRequestId();
+                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
+            }
         }
 
         if (result == ResultProducerFenced) {
             state_ = Producer_Fenced;
-            failPendingMessages(result, true);
+            failPendingMessages(result, false);
             auto client = client_.lock();
             if (client) {
                 client->cleanupProducer(this);
             }
+            lock.unlock();
             producerCreatedPromise_.setFailed(result);
         } else if (producerCreatedPromise_.isComplete()) {
             if (result == ResultProducerBlockedQuotaExceededException) {
                 LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
-                failPendingMessages(ResultProducerBlockedQuotaExceededException, true);
+                failPendingMessages(ResultProducerBlockedQuotaExceededException, false);
             } else if (result == ResultProducerBlockedQuotaExceededError) {
                 LOG_WARN(getName() << "Producer is blocked on creation because backlog is exceeded on topic");
             }
@@ -264,9 +280,10 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
                 scheduleReconnection(shared_from_this());
             } else {
                 LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
-                failPendingMessages(result, true);
-                producerCreatedPromise_.setFailed(result);
+                failPendingMessages(result, false);
                 state_ = Failed;
+                lock.unlock();
+                producerCreatedPromise_.setFailed(result);
             }
         }
     }
@@ -694,6 +711,8 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) {
         }
     };
 
+    Lock lock(mutex_);
+
     // if the producer was never started then there is nothing to clean up
     State expectedState = NotStarted;
     if (state_.compare_exchange_strong(expectedState, Closed)) {
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 74d3cf2..6969735 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -424,4 +424,47 @@ TEST(ProducerTest, testCloseSubProducerWhenFail) {
     client.close();
 }
 
+TEST(ProducerTest, testCloseProducerBeforeCreated) {
+    Client client(serviceUrl);
+
+    std::string ns = "test-close-producer-before-created";
+    std::string localName = std::string("testCloseProducerBeforeCreated") + std::to_string(time(nullptr));
+    std::string topicName = "persistent://public/" + ns + '/' + localName;
+    const int maxProducersPerTopic = 10;
+    const int partitionNum = 5;
+
+    // call admin api to create namespace with max prodcuer limit
+    std::string url = adminUrl + "admin/v2/namespaces/public/" + ns;
+    int res =
+        makePutRequest(url, "{\"max_producers_per_topic\": " + std::to_string(maxProducersPerTopic) + "}");
+    ASSERT_TRUE(res == 204 || res == 409) << "res:" << res;
+
+    // call admin api to create partitioned topic
+    res = makePutRequest(adminUrl + "admin/v2/persistent/public/" + ns + "/" + localName + "/partitions",
+                         std::to_string(partitionNum));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setLazyStartPartitionedProducers(true);
+    producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    producerConfiguration.setBatchingEnabled(false);
+
+    Message msg = MessageBuilder().setContent("test").build();
+    for (int i = 0; i < maxProducersPerTopic * 100; ++i) {
+        Producer producer;
+        ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer));
+        // trigger lazy producer creation
+        for (int j = 0; j < partitionNum; ++j) {
+            producer.sendAsync(msg, [](pulsar::Result, const pulsar::MessageId&) {});
+        }
+        producer.close();
+    }
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, {}, producer));
+    producer.close();
+
+    client.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));