You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 11:48:57 UTC

[pulsar] 02/12: [C++] Fix UnknownError might be returned for a partitioned producer (#15161)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 95b5ef8be37da96c99711bddad2d7996dd80007d
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Apr 14 14:59:46 2022 +0800

    [C++] Fix UnknownError might be returned for a partitioned producer (#15161)
    
    Fixes #15078
    
    When a partitioned producer is created and some of the partitioned
    failed to create, `closeAsync` will be called immediately, even if other
    partitions were still in progress of creating the associated single
    producers.
    
    Since `closeAsync` is called before calling `setFailed` on the
    `partitionedProducerCreatedPromise_` field, there is a race condition
    that all single producers are closed before the promise is set. Then the
    promise will be set with `ResultUnknownError`, see
    https://github.com/apache/pulsar/blob/4aeeed5dab9dfe9493526f36d539b3ef29cf6fe5/pulsar-client-cpp/lib/PartitionedProducerImpl.cc#L317.
    
    Only after all single producers failed or succeeded then call
    `closeAsync` if one of them failed. And ensure
    `partitionedProducerCreatedPromise_` was completed before calling
    `closeAsync`.
    
    This PR also makes the state of a partitioned producer atomic because
    using a mutex to protect it makes code hard to write.
    
    Create a separate namespace `public/test-backlog-quotas` to test the
    case when the backlog quota exceeds. Then add
    `testBacklogQuotasExceeded` test to make some backlog via creating a
    consumer and sending some messages to a partition of the topic.
    
    In this test, only 1 partition has backlog and will fail with the
    related error. So the test verifies that `createProducer` could return a
    correct error instead of `ResultUnknownError`.
    
    (cherry picked from commit 0f8559611914a17efe222bf6ed1dd621d5a9cf45)
---
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 55 +++++++++---------------
 pulsar-client-cpp/lib/PartitionedProducerImpl.h  | 12 ++----
 pulsar-client-cpp/pulsar-test-service-start.sh   |  4 ++
 pulsar-client-cpp/tests/ProducerTest.cc          | 43 ++++++++++++++++++
 4 files changed, 71 insertions(+), 43 deletions(-)

diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index bdd23ed6c91..9f197f0d45f 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -136,28 +136,29 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
                                                                    unsigned int partitionIndex) {
     // to indicate, we are doing cleanup using closeAsync after producer create
     // has failed and the invocation of closeAsync is not from client
-    CloseCallback closeCallback = NULL;
-    Lock lock(mutex_);
+    const auto numPartitions = getNumPartitionsWithLock();
+    assert(numProducersCreated_ <= numPartitions && partitionIndex <= numPartitions);
+
     if (state_ == Failed) {
-        // Ignore, we have already informed client that producer creation failed
+        // We have already informed client that producer creation failed
+        if (++numProducersCreated_ == numPartitions) {
+            closeAsync(nullptr);
+        }
         return;
     }
-    const auto numPartitions = getNumPartitionsWithLock();
-    assert(numProducersCreated_ <= numPartitions);
+
     if (result != ResultOk) {
-        state_ = Failed;
-        lock.unlock();
-        closeAsync(closeCallback);
-        partitionedProducerCreatedPromise_.setFailed(result);
         LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
+        partitionedProducerCreatedPromise_.setFailed(result);
+        state_ = Failed;
+        if (++numProducersCreated_ == numPartitions) {
+            closeAsync(nullptr);
+        }
         return;
     }
 
-    assert(partitionIndex <= numPartitions);
-    numProducersCreated_++;
-    if (numProducersCreated_ == numPartitions) {
+    if (++numProducersCreated_ == numPartitions) {
         state_ = Ready;
-        lock.unlock();
         if (partitionsUpdateTimer_) {
             runPartitionUpdateTask();
         }
@@ -181,7 +182,7 @@ void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partition
 
 // override
 void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
-    if (!assertState(Ready)) {
+    if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg.getMessageId());
         return;
     }
@@ -211,18 +212,7 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
 }
 
 // override
-void PartitionedProducerImpl::shutdown() { setState(Closed); }
-
-void PartitionedProducerImpl::setState(const PartitionedProducerState state) {
-    Lock lock(mutex_);
-    state_ = state;
-    lock.unlock();
-}
-
-bool PartitionedProducerImpl::assertState(const PartitionedProducerState state) {
-    Lock lock(mutex_);
-    return state_ == state;
-}
+void PartitionedProducerImpl::shutdown() { state_ = Closed; }
 
 const std::string& PartitionedProducerImpl::getProducerName() const {
     Lock producersLock(producersMutex_);
@@ -251,7 +241,10 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
  * create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure
  */
 void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
-    setState(Closing);
+    if (state_ == Closing || state_ == Closed) {
+        return;
+    }
+    state_ = Closing;
 
     unsigned int producerAlreadyClosed = 0;
 
@@ -280,7 +273,7 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
      * handleSinglePartitionProducerCreated
      */
     if (producerAlreadyClosed == numProducers && closeCallback) {
-        setState(Closed);
+        state_ = Closed;
         closeCallback(ResultOk);
     }
 }
@@ -288,14 +281,12 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
 void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
                                                                  const unsigned int partitionIndex,
                                                                  CloseCallback callback) {
-    Lock lock(mutex_);
     if (state_ == Failed) {
         // we should have already notified the client by callback
         return;
     }
     if (result != ResultOk) {
         state_ = Failed;
-        lock.unlock();
         LOG_ERROR("Closing the producer failed for partition - " << partitionIndex);
         if (callback) {
             callback(result);
@@ -309,7 +300,6 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
     // closed all successfully
     if (!numProducersCreated_) {
         state_ = Closed;
-        lock.unlock();
         // set the producerCreatedPromise to failure, if client called
         // closeAsync and it's not failure to create producer, the promise
         // is set second time here, first time it was successful. So check
@@ -395,7 +385,6 @@ void PartitionedProducerImpl::getPartitionMetadata() {
 
 void PartitionedProducerImpl::handleGetPartitions(Result result,
                                                   const LookupDataResultPtr& lookupDataResult) {
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         return;
     }
@@ -428,11 +417,9 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
 }
 
 bool PartitionedProducerImpl::isConnected() const {
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         return false;
     }
-    stateLock.unlock();
 
     Lock producersLock(producersMutex_);
     const auto producers = producers_;
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index 874d6cda526..0a8c10e2213 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -30,7 +30,7 @@ namespace pulsar {
 class PartitionedProducerImpl : public ProducerImplBase,
                                 public std::enable_shared_from_this<PartitionedProducerImpl> {
    public:
-    enum PartitionedProducerState
+    enum State
     {
         Pending,
         Ready,
@@ -73,8 +73,6 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     void notifyResult(CloseCallback closeCallback);
 
-    void setState(PartitionedProducerState state);
-
     friend class PulsarFriend;
 
    private:
@@ -83,7 +81,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     const TopicNamePtr topicName_;
     const std::string topic_;
 
-    unsigned int numProducersCreated_ = 0;
+    std::atomic_uint numProducersCreated_{0};
 
     /*
      * set when one or more Single Partition Creation fails, close will cleanup and fail the create callbackxo
@@ -99,10 +97,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     mutable std::mutex producersMutex_;
     MessageRoutingPolicyPtr routerPolicy_;
 
-    // mutex_ is used to share state_, and numProducersCreated_
-    mutable std::mutex mutex_;
-
-    PartitionedProducerState state_ = Pending;
+    std::atomic<State> state_{Pending};
 
     // only set this promise to value, when producers on all partitions are created.
     Promise<Result, ProducerImplBaseWeakPtr> partitionedProducerCreatedPromise_;
@@ -124,7 +119,6 @@ class PartitionedProducerImpl : public ProducerImplBase,
     void runPartitionUpdateTask();
     void getPartitionMetadata();
     void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
-    bool assertState(const PartitionedProducerState state);
 };
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh b/pulsar-client-cpp/pulsar-test-service-start.sh
index 248d628b9c2..2bee18e64b9 100755
--- a/pulsar-client-cpp/pulsar-test-service-start.sh
+++ b/pulsar-client-cpp/pulsar-test-service-start.sh
@@ -106,6 +106,10 @@ $PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-4 \
                         --role "anonymous"
 $PULSAR_DIR/bin/pulsar-admin namespaces set-encryption-required public/default-4 -e
 
+# Create "public/test-backlog-quotas" to test backlog quotas policy
+$PULSAR_DIR/bin/pulsar-admin namespaces create public/test-backlog-quotas \
+                        --clusters standalone
+
 # Create "private" tenant
 $PULSAR_DIR/bin/pulsar-admin tenants create private -r "" -c "standalone"
 
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 210f01345d4..14461429da3 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -240,4 +240,47 @@ TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) {
         client.close();
         LOG_INFO("End of run " << run);
     }
+}
+
+TEST(ProducerTest, testBacklogQuotasExceeded) {
+    std::string ns = "public/test-backlog-quotas";
+    std::string topic = ns + "/testBacklogQuotasExceeded" + std::to_string(time(nullptr));
+
+    int res = makePutRequest(adminUrl + "admin/v2/persistent/" + topic + "/partitions", "5");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    LOG_INFO("Created topic " << topic << " with 5 partitions");
+
+    auto setBacklogPolicy = [&ns](const std::string& policy, int limitSize) {
+        const auto body =
+            R"({"policy":")" + policy + R"(","limitSize":)" + std::to_string(limitSize) + "}";
+        int res = makePostRequest(adminUrl + "admin/v2/namespaces/" + ns + "/backlogQuota", body);
+        LOG_INFO(res << " | Change the backlog policy to: " << body);
+        ASSERT_TRUE(res == 204 || res == 409);
+    };
+
+    Client client(serviceUrl);
+
+    // Create a topic with backlog size that is greater than 1024
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));  // create a cursor
+    Producer producer;
+
+    const auto partition = topic + "-partition-0";
+    ASSERT_EQ(ResultOk, client.createProducer(partition, producer));
+    ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(std::string(1024L, 'a')).build()));
+    ASSERT_EQ(ResultOk, producer.close());
+
+    setBacklogPolicy("producer_request_hold", 1024);
+    ASSERT_EQ(ResultProducerBlockedQuotaExceededError, client.createProducer(topic, producer));
+    ASSERT_EQ(ResultProducerBlockedQuotaExceededError, client.createProducer(partition, producer));
+
+    setBacklogPolicy("producer_exception", 1024);
+    ASSERT_EQ(ResultProducerBlockedQuotaExceededException, client.createProducer(topic, producer));
+    ASSERT_EQ(ResultProducerBlockedQuotaExceededException, client.createProducer(partition, producer));
+
+    setBacklogPolicy("consumer_backlog_eviction", 1024);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    ASSERT_EQ(ResultOk, client.createProducer(partition, producer));
+
+    client.close();
 }
\ No newline at end of file