You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/10/22 14:50:23 UTC
[pulsar-client-cpp] branch main updated: Avoid blocking the message listener threads (#332)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 6daf7a5 Avoid blocking the message listener threads (#332)
6daf7a5 is described below
commit 6daf7a575039d11d7b4a27af474e940831e8ac97
Author: erobot <er...@users.noreply.github.com>
AuthorDate: Sun Oct 22 22:50:18 2023 +0800
Avoid blocking the message listener threads (#332)
### Motivation
The message listener thread blocks when the receiver queue of `MultiTopicsConsumerImpl` is full. As message listener threads are used by all consumers in the same `Client`, if one slow consumer blocks the listener threads, all other consuemrs can no longer receive new messages.
### Modifications
1. Modify `MultiTopicsConsumerImpl` to use `UnboundedBlockingQueue` to avoid blocking
2. Modify the permit update logic: Increase permit only after messages consumed from `MultiTopicsConsumerImpl`
---
lib/ConsumerImpl.cc | 14 ++++++++-
lib/ConsumerImpl.h | 1 +
lib/MessageImpl.h | 1 +
lib/MultiTopicsConsumerImpl.cc | 16 +++++-----
lib/MultiTopicsConsumerImpl.h | 4 +--
tests/ConsumerTest.cc | 66 ++++++++++++++++++++++++++++++++++++++++++
6 files changed, 92 insertions(+), 10 deletions(-)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 61def68..8dd334e 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1032,7 +1032,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
return;
}
- increaseAvailablePermits(currentCnx);
+ if (!hasParent_) {
+ increaseAvailablePermits(currentCnx);
+ }
if (track) {
trackMessage(msg.getMessageId());
}
@@ -1089,6 +1091,16 @@ void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCn
}
}
+void ConsumerImpl::increaseAvailablePermits(const Message& msg) {
+ ClientConnectionPtr currentCnx = getCnx().lock();
+ if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
+ LOG_DEBUG(getName() << "Not adding permit since connection is different.");
+ return;
+ }
+
+ increaseAvailablePermits(currentCnx);
+}
+
inline CommandSubscribe_SubType ConsumerImpl::getSubType() {
ConsumerType type = config_.getConsumerType();
switch (type) {
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 8d5bdcf..dd7163f 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -168,6 +168,7 @@ class ConsumerImpl : public ConsumerImplBase {
void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
CommandAck_ValidationError validationError);
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
+ void increaseAvailablePermits(const Message& msg);
void drainIncomingMessageQueue(size_t count);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
const BitSet& ackSet, int redeliveryCount);
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index cc07c58..55b9612 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -47,6 +47,7 @@ class MessageImpl {
int redeliveryCount_;
bool hasSchemaVersion_;
const std::string* schemaVersion_;
+ std::weak_ptr<class ConsumerImpl> consumerPtr_;
const std::string& getPartitionKey() const;
bool hasPartitionKey() const;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 5162a61..abc54c8 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -519,6 +519,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
<< " message:" << msg.getDataAsString());
msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
+ msg.impl_->consumerPtr_ = std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
Lock lock(pendingReceiveMutex_);
if (!pendingReceives_.empty()) {
@@ -530,18 +531,15 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
auto self = weakSelf.lock();
if (self) {
notifyPendingReceivedCallback(ResultOk, msg, callback);
+ auto consumer = msg.impl_->consumerPtr_.lock();
+ if (consumer) {
+ consumer->increaseAvailablePermits(msg);
+ }
}
});
return;
}
- if (incomingMessages_.full()) {
- lock.unlock();
- }
-
- // add message to block queue.
- // when messages queue is full, will block listener thread on ConsumerImpl,
- // then will not send permits to broker, will broker stop push message.
incomingMessages_.push(msg);
incomingMessagesSize_.fetch_add(msg.getLength());
@@ -1072,6 +1070,10 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece
void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
incomingMessagesSize_.fetch_sub(msg.getLength());
unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ auto consumer = msg.impl_->consumerPtr_.lock();
+ if (consumer) {
+ consumer->increaseAvailablePermits(msg);
+ }
}
std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index b00b0f2..d4127f6 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -24,7 +24,6 @@
#include <memory>
#include <vector>
-#include "BlockingQueue.h"
#include "Commands.h"
#include "ConsumerImplBase.h"
#include "ConsumerInterceptors.h"
@@ -33,6 +32,7 @@
#include "LookupDataResult.h"
#include "SynchronizedHashMap.h"
#include "TestUtil.h"
+#include "UnboundedBlockingQueue.h"
namespace pulsar {
typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
@@ -115,7 +115,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
std::map<std::string, int> topicsPartitions_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
- BlockingQueue<Message> incomingMessages_;
+ UnboundedBlockingQueue<Message> incomingMessages_;
std::atomic_int incomingMessagesSize_ = {0};
MessageListener messageListener_;
DeadlineTimerPtr partitionsUpdateTimer_;
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 2d195fd..7917508 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1335,4 +1335,70 @@ TEST(ConsumerTest, testRetrySubscribe) {
// milliseconds
}
+TEST(ConsumerTest, testNoListenerThreadBlocking) {
+ Client client{lookupUrl};
+
+ const int numPartitions = 2;
+ const std::string partitionedTopic = "testNoListenerThreadBlocking-" + std::to_string(time(nullptr));
+ int res =
+ makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
+ std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ const int receiverQueueSize = 1;
+ const int receiverQueueSizeAcrossPartitions = receiverQueueSize * numPartitions;
+
+ Consumer consumer1, consumer2;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setReceiverQueueSize(receiverQueueSize);
+ consumerConfig.setMaxTotalReceiverQueueSizeAcrossPartitions(receiverQueueSizeAcrossPartitions);
+ Result consumerResult;
+ consumerResult = client.subscribe(partitionedTopic, "sub1", consumerConfig, consumer1);
+ ASSERT_EQ(consumerResult, ResultOk);
+ consumerResult = client.subscribe(partitionedTopic, "sub2", consumerConfig, consumer2);
+ ASSERT_EQ(consumerResult, ResultOk);
+
+ Producer producer;
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(false);
+ producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+ Result producerResult = client.createProducer(partitionedTopic, producerConfig, producer);
+ ASSERT_EQ(producerResult, ResultOk);
+
+ const int msgCount = receiverQueueSizeAcrossPartitions * 100;
+
+ for (int i = 0; i < msgCount; ++i) {
+ auto msg = MessageBuilder().setContent("test").build();
+ producer.sendAsync(msg, [](Result code, const MessageId& messageId) {});
+ }
+ producer.flush();
+ producer.close();
+
+ waitUntil(std::chrono::seconds(1), [consumer1] {
+ auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
+ return multiConsumerImpl->getNumOfPrefetchedMessages() == receiverQueueSizeAcrossPartitions;
+ });
+
+ // check consumer1 prefetch num
+ auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
+ int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
+ ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);
+
+ // read consumer2 while consumer1 reaches the prefech limit
+ for (int i = 0; i < msgCount; ++i) {
+ auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer2);
+ int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
+ ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);
+
+ Message msg;
+ Result ret = consumer2.receive(msg, 1000);
+ ASSERT_EQ(ret, ResultOk);
+ consumer2.acknowledge(msg);
+ }
+
+ consumer2.close();
+ consumer1.close();
+ client.close();
+}
+
} // namespace pulsar