You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/07 16:01:07 UTC
[pulsar] branch master updated: [C++] Avoid sending flow requests
with zero permits (#10506)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 37583f6 [C++] Avoid sending flow requests with zero permits (#10506)
37583f6 is described below
commit 37583f61aac166976dee1c843848e3c6041579ec
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri May 7 23:59:38 2021 +0800
[C++] Avoid sending flow requests with zero permits (#10506)
* Avoid sending zero permits
* Avoid the same topic name
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 2 +-
pulsar-client-cpp/tests/ZeroQueueSizeTest.cc | 61 ++++++++++++++++++++++++++++
2 files changed, 62 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 949d59c..33ed682 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -204,7 +204,7 @@ void ConsumerImpl::connectionFailed(Result result) {
}
void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages) {
- if (cnx) {
+ if (cnx && numMessages > 0) {
LOG_DEBUG(getName() << "Send more permits: " << numMessages);
SharedBuffer cmd = Commands::newFlow(consumerId_, static_cast<unsigned int>(numMessages));
cnx->sendCommand(cmd);
diff --git a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
index 6d6c933..59780fe 100644
--- a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
+++ b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
@@ -223,3 +223,64 @@ TEST(ZeroQueueSizeTest, testPauseResume) {
client.close();
}
+
+TEST(ZeroQueueSizeTest, testPauseResumeNoReconnection) {
+ Client client(lookupUrl);
+ const auto topic = "ZeroQueueSizeTestPauseResumeNoReconnection-" + std::to_string(time(nullptr));
+
+ std::mutex mtx;
+ std::condition_variable cond;
+ bool running = true;
+
+ auto notify = [&mtx, &cond, &running] {
+ std::unique_lock<std::mutex> lock(mtx);
+ running = false;
+ cond.notify_all();
+ };
+ auto wait = [&mtx, &cond, &running] {
+ std::unique_lock<std::mutex> lock(mtx);
+ running = true;
+ while (running) {
+ cond.wait(lock);
+ }
+ };
+
+ std::mutex mtxForMessages;
+ std::vector<std::string> receivedMessages;
+
+ ConsumerConfiguration consumerConf;
+ consumerConf.setReceiverQueueSize(0);
+ consumerConf.setMessageListener(
+ [&mtxForMessages, &receivedMessages, ¬ify](Consumer consumer, const Message& msg) {
+ std::unique_lock<std::mutex> lock(mtxForMessages);
+ receivedMessages.emplace_back(msg.getDataAsString());
+ lock.unlock();
+ consumer.acknowledge(msg);
+ notify(); // notify the consumer that a new message arrived
+ });
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumerConf, consumer));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk,
+ client.createProducer(topic, ProducerConfiguration().setBatchingEnabled(false), producer));
+
+ constexpr int numMessages = 300;
+ for (int i = 0; i < numMessages; i++) {
+ const auto message = MessageBuilder().setContent(std::to_string(i)).build();
+ consumer.resumeMessageListener();
+ producer.sendAsync(message, {});
+ wait(); // wait until a new message is received
+ consumer.pauseMessageListener();
+ }
+
+ std::unique_lock<std::mutex> lock(mtxForMessages);
+ ASSERT_EQ(receivedMessages.size(), numMessages);
+ for (int i = 0; i < numMessages; i++) {
+ ASSERT_EQ(i, std::stoi(receivedMessages[i]));
+ }
+ lock.unlock();
+
+ client.close();
+}