You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/12/02 18:33:31 UTC

[pulsar-client-cpp] 01/02: [fix] Close broker producer created after producer close (#131)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit aa6d52c99949f6c9c7ebbf38fecb1a268a06b424
Author: erobot <er...@users.noreply.github.com>
AuthorDate: Thu Dec 1 18:12:40 2022 +0800

    [fix] Close broker producer created after producer close (#131)
    
    ### Motivation
    
    Close broker producer created after producer close to prevent producers from keeping alive in broker, and fix a race condition here.
    
    Race condition sequence:
    1. prodcuer.start() is called
    2. handleCreateProducer() is called, and runs to right before setCnx()
    3. closeAsync() is called and returns directly after getCnx() returns nullptr, so no CloseProducer cmd is sent
    4. handleCreateProducer() continues
    5. Result: Producer is closed and no CloseProducer cmd is sent to broker
    
    ### Modifications
    
    * Close broker producer created after producer close
    * Use mutex to prevent race condition. handleCreateProducer() and closeAsync() should be low frequency operations and using mutex here should not affect performance.
---
 lib/ProducerImpl.cc   | 35 +++++++++++++++++++++++++++--------
 tests/ProducerTest.cc | 43 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 70 insertions(+), 8 deletions(-)

diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 7fa3ff2..a3a5a95 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -172,6 +172,8 @@ void ProducerImpl::connectionFailed(Result result) {
 
 void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
                                         const ResponseData& responseData) {
+    Lock lock(mutex_);
+
     LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));
 
     // make sure we're still in the Pending/Ready state, closeAsync could have been invoked
@@ -180,11 +182,21 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
     if (state != Ready && state != Pending) {
         LOG_DEBUG("Producer created response received but producer already closed");
         failPendingMessages(ResultAlreadyClosed, false);
+        if (result == ResultOk || result == ResultTimeout) {
+            auto client = client_.lock();
+            if (client) {
+                int requestId = client->newRequestId();
+                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
+            }
+        }
+        if (!producerCreatedPromise_.isComplete()) {
+            lock.unlock();
+            producerCreatedPromise_.setFailed(ResultAlreadyClosed);
+        }
         return;
     }
 
     if (result == ResultOk) {
-        Lock lock(mutex_);
         // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
         // set the cnx pointer so that new messages will be sent immediately
         LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
@@ -203,7 +215,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         setCnx(cnx);
         state_ = Ready;
         backoff_.reset();
-        lock.unlock();
 
         if (conf_.isEncryptionEnabled()) {
             auto weakSelf = weak_from_this();
@@ -226,6 +237,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
             startSendTimeoutTimer();
         }
 
+        lock.unlock();
         producerCreatedPromise_.setValue(shared_from_this());
 
     } else {
@@ -234,22 +246,26 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
             // Creating the producer has timed out. We need to ensure the broker closes the producer
             // in case it was indeed created, otherwise it might prevent new create producer operation,
             // since we are not closing the connection
-            int requestId = client_.lock()->newRequestId();
-            cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
+            auto client = client_.lock();
+            if (client) {
+                int requestId = client->newRequestId();
+                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
+            }
         }
 
         if (result == ResultProducerFenced) {
             state_ = Producer_Fenced;
-            failPendingMessages(result, true);
+            failPendingMessages(result, false);
             auto client = client_.lock();
             if (client) {
                 client->cleanupProducer(this);
             }
+            lock.unlock();
             producerCreatedPromise_.setFailed(result);
         } else if (producerCreatedPromise_.isComplete()) {
             if (result == ResultProducerBlockedQuotaExceededException) {
                 LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
-                failPendingMessages(ResultProducerBlockedQuotaExceededException, true);
+                failPendingMessages(ResultProducerBlockedQuotaExceededException, false);
             } else if (result == ResultProducerBlockedQuotaExceededError) {
                 LOG_WARN(getName() << "Producer is blocked on creation because backlog is exceeded on topic");
             }
@@ -264,9 +280,10 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
                 scheduleReconnection(shared_from_this());
             } else {
                 LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
-                failPendingMessages(result, true);
-                producerCreatedPromise_.setFailed(result);
+                failPendingMessages(result, false);
                 state_ = Failed;
+                lock.unlock();
+                producerCreatedPromise_.setFailed(result);
             }
         }
     }
@@ -694,6 +711,8 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) {
         }
     };
 
+    Lock lock(mutex_);
+
     // if the producer was never started then there is nothing to clean up
     State expectedState = NotStarted;
     if (state_.compare_exchange_strong(expectedState, Closed)) {
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 74d3cf2..6969735 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -424,4 +424,47 @@ TEST(ProducerTest, testCloseSubProducerWhenFail) {
     client.close();
 }
 
+TEST(ProducerTest, testCloseProducerBeforeCreated) {
+    Client client(serviceUrl);
+
+    std::string ns = "test-close-producer-before-created";
+    std::string localName = std::string("testCloseProducerBeforeCreated") + std::to_string(time(nullptr));
+    std::string topicName = "persistent://public/" + ns + '/' + localName;
+    const int maxProducersPerTopic = 10;
+    const int partitionNum = 5;
+
+    // call admin api to create namespace with max prodcuer limit
+    std::string url = adminUrl + "admin/v2/namespaces/public/" + ns;
+    int res =
+        makePutRequest(url, "{\"max_producers_per_topic\": " + std::to_string(maxProducersPerTopic) + "}");
+    ASSERT_TRUE(res == 204 || res == 409) << "res:" << res;
+
+    // call admin api to create partitioned topic
+    res = makePutRequest(adminUrl + "admin/v2/persistent/public/" + ns + "/" + localName + "/partitions",
+                         std::to_string(partitionNum));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setLazyStartPartitionedProducers(true);
+    producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    producerConfiguration.setBatchingEnabled(false);
+
+    Message msg = MessageBuilder().setContent("test").build();
+    for (int i = 0; i < maxProducersPerTopic * 100; ++i) {
+        Producer producer;
+        ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer));
+        // trigger lazy producer creation
+        for (int j = 0; j < partitionNum; ++j) {
+            producer.sendAsync(msg, [](pulsar::Result, const pulsar::MessageId&) {});
+        }
+        producer.close();
+    }
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, {}, producer));
+    producer.close();
+
+    client.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));