You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/10/22 14:50:23 UTC

[pulsar-client-cpp] branch main updated: Avoid blocking the message listener threads (#332)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 6daf7a5  Avoid blocking the message listener threads (#332)
6daf7a5 is described below

commit 6daf7a575039d11d7b4a27af474e940831e8ac97
Author: erobot <er...@users.noreply.github.com>
AuthorDate: Sun Oct 22 22:50:18 2023 +0800

    Avoid blocking the message listener threads (#332)
    
    ### Motivation
    
    The message listener thread blocks when the receiver queue of `MultiTopicsConsumerImpl` is full. As message listener threads are used by all consumers in the same `Client`, if one slow consumer blocks the listener threads, all other consuemrs can no longer receive new messages.
    
    ### Modifications
    
    1. Modify `MultiTopicsConsumerImpl` to use `UnboundedBlockingQueue` to avoid blocking
    2. Modify the permit update logic: Increase permit only after messages consumed from `MultiTopicsConsumerImpl`
---
 lib/ConsumerImpl.cc            | 14 ++++++++-
 lib/ConsumerImpl.h             |  1 +
 lib/MessageImpl.h              |  1 +
 lib/MultiTopicsConsumerImpl.cc | 16 +++++-----
 lib/MultiTopicsConsumerImpl.h  |  4 +--
 tests/ConsumerTest.cc          | 66 ++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 92 insertions(+), 10 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 61def68..8dd334e 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1032,7 +1032,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
         return;
     }
 
-    increaseAvailablePermits(currentCnx);
+    if (!hasParent_) {
+        increaseAvailablePermits(currentCnx);
+    }
     if (track) {
         trackMessage(msg.getMessageId());
     }
@@ -1089,6 +1091,16 @@ void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCn
     }
 }
 
+void ConsumerImpl::increaseAvailablePermits(const Message& msg) {
+    ClientConnectionPtr currentCnx = getCnx().lock();
+    if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
+        LOG_DEBUG(getName() << "Not adding permit since connection is different.");
+        return;
+    }
+
+    increaseAvailablePermits(currentCnx);
+}
+
 inline CommandSubscribe_SubType ConsumerImpl::getSubType() {
     ConsumerType type = config_.getConsumerType();
     switch (type) {
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 8d5bdcf..dd7163f 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -168,6 +168,7 @@ class ConsumerImpl : public ConsumerImplBase {
     void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
                                  CommandAck_ValidationError validationError);
     void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
+    void increaseAvailablePermits(const Message& msg);
     void drainIncomingMessageQueue(size_t count);
     uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
                                                 const BitSet& ackSet, int redeliveryCount);
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index cc07c58..55b9612 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -47,6 +47,7 @@ class MessageImpl {
     int redeliveryCount_;
     bool hasSchemaVersion_;
     const std::string* schemaVersion_;
+    std::weak_ptr<class ConsumerImpl> consumerPtr_;
 
     const std::string& getPartitionKey() const;
     bool hasPartitionKey() const;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 5162a61..abc54c8 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -519,6 +519,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
     LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
                                                           << " message:" << msg.getDataAsString());
     msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
+    msg.impl_->consumerPtr_ = std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
 
     Lock lock(pendingReceiveMutex_);
     if (!pendingReceives_.empty()) {
@@ -530,18 +531,15 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
             auto self = weakSelf.lock();
             if (self) {
                 notifyPendingReceivedCallback(ResultOk, msg, callback);
+                auto consumer = msg.impl_->consumerPtr_.lock();
+                if (consumer) {
+                    consumer->increaseAvailablePermits(msg);
+                }
             }
         });
         return;
     }
 
-    if (incomingMessages_.full()) {
-        lock.unlock();
-    }
-
-    // add message to block queue.
-    // when messages queue is full, will block listener thread on ConsumerImpl,
-    // then will not send permits to broker, will broker stop push message.
     incomingMessages_.push(msg);
     incomingMessagesSize_.fetch_add(msg.getLength());
 
@@ -1072,6 +1070,10 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece
 void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
     incomingMessagesSize_.fetch_sub(msg.getLength());
     unAckedMessageTrackerPtr_->add(msg.getMessageId());
+    auto consumer = msg.impl_->consumerPtr_.lock();
+    if (consumer) {
+        consumer->increaseAvailablePermits(msg);
+    }
 }
 
 std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index b00b0f2..d4127f6 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -24,7 +24,6 @@
 #include <memory>
 #include <vector>
 
-#include "BlockingQueue.h"
 #include "Commands.h"
 #include "ConsumerImplBase.h"
 #include "ConsumerInterceptors.h"
@@ -33,6 +32,7 @@
 #include "LookupDataResult.h"
 #include "SynchronizedHashMap.h"
 #include "TestUtil.h"
+#include "UnboundedBlockingQueue.h"
 
 namespace pulsar {
 typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
@@ -115,7 +115,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     std::map<std::string, int> topicsPartitions_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    BlockingQueue<Message> incomingMessages_;
+    UnboundedBlockingQueue<Message> incomingMessages_;
     std::atomic_int incomingMessagesSize_ = {0};
     MessageListener messageListener_;
     DeadlineTimerPtr partitionsUpdateTimer_;
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 2d195fd..7917508 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1335,4 +1335,70 @@ TEST(ConsumerTest, testRetrySubscribe) {
     // milliseconds
 }
 
+TEST(ConsumerTest, testNoListenerThreadBlocking) {
+    Client client{lookupUrl};
+
+    const int numPartitions = 2;
+    const std::string partitionedTopic = "testNoListenerThreadBlocking-" + std::to_string(time(nullptr));
+    int res =
+        makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
+                       std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    const int receiverQueueSize = 1;
+    const int receiverQueueSizeAcrossPartitions = receiverQueueSize * numPartitions;
+
+    Consumer consumer1, consumer2;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setReceiverQueueSize(receiverQueueSize);
+    consumerConfig.setMaxTotalReceiverQueueSizeAcrossPartitions(receiverQueueSizeAcrossPartitions);
+    Result consumerResult;
+    consumerResult = client.subscribe(partitionedTopic, "sub1", consumerConfig, consumer1);
+    ASSERT_EQ(consumerResult, ResultOk);
+    consumerResult = client.subscribe(partitionedTopic, "sub2", consumerConfig, consumer2);
+    ASSERT_EQ(consumerResult, ResultOk);
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(false);
+    producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    Result producerResult = client.createProducer(partitionedTopic, producerConfig, producer);
+    ASSERT_EQ(producerResult, ResultOk);
+
+    const int msgCount = receiverQueueSizeAcrossPartitions * 100;
+
+    for (int i = 0; i < msgCount; ++i) {
+        auto msg = MessageBuilder().setContent("test").build();
+        producer.sendAsync(msg, [](Result code, const MessageId& messageId) {});
+    }
+    producer.flush();
+    producer.close();
+
+    waitUntil(std::chrono::seconds(1), [consumer1] {
+        auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
+        return multiConsumerImpl->getNumOfPrefetchedMessages() == receiverQueueSizeAcrossPartitions;
+    });
+
+    // check consumer1 prefetch num
+    auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
+    int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
+    ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);
+
+    // read consumer2 while consumer1 reaches the prefech limit
+    for (int i = 0; i < msgCount; ++i) {
+        auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer2);
+        int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
+        ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);
+
+        Message msg;
+        Result ret = consumer2.receive(msg, 1000);
+        ASSERT_EQ(ret, ResultOk);
+        consumer2.acknowledge(msg);
+    }
+
+    consumer2.close();
+    consumer1.close();
+    client.close();
+}
+
 }  // namespace pulsar