You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/07 16:01:07 UTC

[pulsar] branch master updated: [C++] Avoid sending flow requests with zero permits (#10506)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 37583f6  [C++] Avoid sending flow requests with zero permits (#10506)
37583f6 is described below

commit 37583f61aac166976dee1c843848e3c6041579ec
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri May 7 23:59:38 2021 +0800

    [C++] Avoid sending flow requests with zero permits (#10506)
    
    * Avoid sending zero permits
    
    * Avoid the same topic name
---
 pulsar-client-cpp/lib/ConsumerImpl.cc        |  2 +-
 pulsar-client-cpp/tests/ZeroQueueSizeTest.cc | 61 ++++++++++++++++++++++++++++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 949d59c..33ed682 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -204,7 +204,7 @@ void ConsumerImpl::connectionFailed(Result result) {
 }
 
 void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages) {
-    if (cnx) {
+    if (cnx && numMessages > 0) {
         LOG_DEBUG(getName() << "Send more permits: " << numMessages);
         SharedBuffer cmd = Commands::newFlow(consumerId_, static_cast<unsigned int>(numMessages));
         cnx->sendCommand(cmd);
diff --git a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
index 6d6c933..59780fe 100644
--- a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
+++ b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
@@ -223,3 +223,64 @@ TEST(ZeroQueueSizeTest, testPauseResume) {
 
     client.close();
 }
+
+TEST(ZeroQueueSizeTest, testPauseResumeNoReconnection) {
+    Client client(lookupUrl);
+    const auto topic = "ZeroQueueSizeTestPauseResumeNoReconnection-" + std::to_string(time(nullptr));
+
+    std::mutex mtx;
+    std::condition_variable cond;
+    bool running = true;
+
+    auto notify = [&mtx, &cond, &running] {
+        std::unique_lock<std::mutex> lock(mtx);
+        running = false;
+        cond.notify_all();
+    };
+    auto wait = [&mtx, &cond, &running] {
+        std::unique_lock<std::mutex> lock(mtx);
+        running = true;
+        while (running) {
+            cond.wait(lock);
+        }
+    };
+
+    std::mutex mtxForMessages;
+    std::vector<std::string> receivedMessages;
+
+    ConsumerConfiguration consumerConf;
+    consumerConf.setReceiverQueueSize(0);
+    consumerConf.setMessageListener(
+        [&mtxForMessages, &receivedMessages, &notify](Consumer consumer, const Message& msg) {
+            std::unique_lock<std::mutex> lock(mtxForMessages);
+            receivedMessages.emplace_back(msg.getDataAsString());
+            lock.unlock();
+            consumer.acknowledge(msg);
+            notify();  // notify the consumer that a new message arrived
+        });
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumerConf, consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk,
+              client.createProducer(topic, ProducerConfiguration().setBatchingEnabled(false), producer));
+
+    constexpr int numMessages = 300;
+    for (int i = 0; i < numMessages; i++) {
+        const auto message = MessageBuilder().setContent(std::to_string(i)).build();
+        consumer.resumeMessageListener();
+        producer.sendAsync(message, {});
+        wait();  // wait until a new message is received
+        consumer.pauseMessageListener();
+    }
+
+    std::unique_lock<std::mutex> lock(mtxForMessages);
+    ASSERT_EQ(receivedMessages.size(), numMessages);
+    for (int i = 0; i < numMessages; i++) {
+        ASSERT_EQ(i, std::stoi(receivedMessages[i]));
+    }
+    lock.unlock();
+
+    client.close();
+}