You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/19 15:20:25 UTC

[pulsar] branch master updated: [C++] Release the unused spots of pending message queue (#6926)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bb8809  [C++] Release the unused spots of pending message queue (#6926)
5bb8809 is described below

commit 5bb8809a911fcb804062d4209b3003ed0413499b
Author: Yunze Xu <xy...@gmail.com>
AuthorDate: Tue May 19 23:20:08 2020 +0800

    [C++] Release the unused spots of pending message queue (#6926)
    
    ### Motivation
    
    If messages were sent in batch, every single message would reserve one spot of producer's pending message queue, but only one batched message would be pushed to the queue. Therefore there may exist many unused spots when `ProducerQueueIsFull` happened.
    
    Besides, if a message was too big or failed to be encrypted, `sendAsync` failed immediately but the reserved spot won't be released.
    
    ### Modifications
    
    - Add a `bool` return value to `BatchMessageContainer::sendMessages` to indicate whether the batched message was pushed to producer's queue.
    - Add a `bool` return value to `BatchMessageContainer::add` to indicate whether the reserved spot should be released. The spot would be retained only if the first message was batched and not sent immediately. The spot would be released when the batched message was pushed to the queue.
    - Test sending a batch with a 2-spots pending message queue, one spot for storing the batched message, another spot for preventing `ProducerQueueIsFull` error.
    - Test after all batched messages being sent, whether the reserved spots of producer's queue were  0.
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change is already covered by existing tests, such as `BatchMessageTest.testSendCallback`, `BatchMessageTest.testPartitionedTopics` and `BasicEndToEndTest.testMessageTooBig`.
---
 pulsar-client-cpp/lib/BatchMessageContainer.cc | 40 ++++++++++++++++++++------
 pulsar-client-cpp/lib/BatchMessageContainer.h  |  9 ++++--
 pulsar-client-cpp/lib/BlockingQueue.h          |  2 ++
 pulsar-client-cpp/lib/ProducerImpl.cc          | 22 ++------------
 pulsar-client-cpp/tests/BasicEndToEndTest.cc   |  4 +++
 pulsar-client-cpp/tests/BatchMessageTest.cc    | 13 +++++++--
 pulsar-client-cpp/tests/PulsarFriend.h         | 16 +++++++++++
 7 files changed, 74 insertions(+), 32 deletions(-)

diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index 7413d57..24c6f79 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -43,15 +43,24 @@ BatchMessageContainer::BatchMessageContainer(ProducerImpl& producer)
     LOG_INFO(*this << " BatchMessageContainer constructed");
 }
 
-void BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, bool disableCheck) {
+bool BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, bool disableCheck) {
     // disableCheck is needed to avoid recursion in case the batchSizeInKB < IndividualMessageSizeInKB
     LOG_DEBUG(*this << " Called add function for [message = " << msg << "] [disableCheck = " << disableCheck
                     << "]");
     if (!(disableCheck || hasSpaceInBatch(msg))) {
         LOG_DEBUG(*this << " Batch is full");
-        sendMessage(NULL);
-        add(msg, sendCallback, true);
-        return;
+        bool hasMessages = !messagesContainerListPtr_->empty();
+        bool pushedToPendingQueue = sendMessage(NULL);
+        bool result = add(msg, sendCallback, true);
+        if (hasMessages && !pushedToPendingQueue) {
+            // The msg failed to be pushed to the producer's queue, so the reserved spot before won't be
+            // released and we must return false to tell the producer to release the spot.
+            // Exceptionally, `hasSpaceInBatch` returns false just because `msg` is too big before compressed,
+            // while there're no messages before. In this case, the spots have already been released so we
+            // can't return false simply.
+            return false;
+        }
+        return result;
     }
     if (messagesContainerListPtr_->empty()) {
         // First message to be added
@@ -71,10 +80,18 @@ void BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, b
 
     LOG_DEBUG(*this << " Number of messages in Batch = " << messagesContainerListPtr_->size());
     LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_);
+    bool hasOnlyOneMessage = (messagesContainerListPtr_->size() == 1);
     if (isFull()) {
         LOG_DEBUG(*this << " Batch is full.");
-        sendMessage(NULL);
+        // If there're more than one messages in the batch, even if it was pushed to the queue successfully,
+        // we also returns false to release one spot, because there're two spots to be released. One is
+        // reserved when the first message arrived, another is reserved when the current message arrived.
+        bool pushedToPendingQueue = sendMessage(NULL);
+        return hasOnlyOneMessage && pushedToPendingQueue;
     }
+    // A batch of messages only need one spot, so returns false when more messages were added to the batch,
+    // then outer ProducerImpl::sendAsync() will release unnecessary reserved spots
+    return hasOnlyOneMessage;
 }
 
 void BatchMessageContainer::startTimer() {
@@ -85,7 +102,7 @@ void BatchMessageContainer::startTimer() {
         std::bind(&pulsar::ProducerImpl::batchMessageTimeoutHandler, &producer_, std::placeholders::_1));
 }
 
-void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
+bool BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
     // Call this function after acquiring the ProducerImpl lock
     LOG_DEBUG(*this << "Sending the batch message container");
     if (isEmpty()) {
@@ -93,13 +110,17 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
         if (flushCallback) {
             flushCallback(ResultOk);
         }
-        return;
+        return false;
     }
     impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size());
     compressPayLoad();
 
     SharedBuffer encryptedPayload;
-    producer_.encryptMessage(impl_->metadata, impl_->payload, encryptedPayload);
+    if (!producer_.encryptMessage(impl_->metadata, impl_->payload, encryptedPayload)) {
+        batchMessageCallBack(ResultCryptoError, MessageId{}, messagesContainerListPtr_, nullptr);
+        clear();
+        return false;
+    }
     impl_->payload = encryptedPayload;
 
     if (impl_->payload.readableBytes() > producer_.keepMaxMessageSize_) {
@@ -107,7 +128,7 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
         // can only 1 single message in the batch at this point.
         batchMessageCallBack(ResultMessageTooBig, MessageId{}, messagesContainerListPtr_, nullptr);
         clear();
-        return;
+        return false;
     }
 
     Message msg;
@@ -119,6 +140,7 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
 
     producer_.sendMessage(msg, callback);
     clear();
+    return true;
 }
 
 void BatchMessageContainer::compressPayLoad() {
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.h b/pulsar-client-cpp/lib/BatchMessageContainer.h
index 93b4d81..cd6a3e1 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.h
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.h
@@ -58,7 +58,11 @@ class BatchMessageContainer {
 
     ~BatchMessageContainer();
 
-    void add(const Message& msg, SendCallback sendCallback, bool disableCheck = false);
+    // It was only called in ProducerImpl::sendAsync, while the producer has reserved a spot of pending
+    // message queue before.
+    // It returns true to tell the producer not to release the spot because the spot would be released when
+    // the batched message was pushed to the queue successfully in ProducerImpl::sendMessage.
+    bool add(const Message& msg, SendCallback sendCallback, bool disableCheck = false);
 
     SharedBuffer getBatchedPayload();
 
@@ -109,7 +113,8 @@ class BatchMessageContainer {
 
     void startTimer();
 
-    void sendMessage(FlushCallback callback);
+    // Returns true if a batch of messages was sent to producer's pending message queue
+    bool sendMessage(FlushCallback callback);
 };
 
 bool BatchMessageContainer::hasSpaceInBatch(const Message& msg) const {
diff --git a/pulsar-client-cpp/lib/BlockingQueue.h b/pulsar-client-cpp/lib/BlockingQueue.h
index 39f8511..1cdaf93 100644
--- a/pulsar-client-cpp/lib/BlockingQueue.h
+++ b/pulsar-client-cpp/lib/BlockingQueue.h
@@ -262,6 +262,8 @@ class BlockingQueue {
 
     iterator end() { return queue_.end(); }
 
+    int reservedSpots() const { return reservedSpots_; }
+
    private:
     void releaseReservedSpot() {
         Lock lock(mutex_);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index a488a86..e8ba563 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -237,14 +237,6 @@ void ProducerImpl::failPendingMessages(Result result) {
     // without holding producer mutex.
     for (MessageQueue::const_iterator it = pendingMessagesQueue_.begin(); it != pendingMessagesQueue_.end();
          it++) {
-        // When dealing any failure message, if the current message is a batch one, we should also release
-        // the reserved spots in the pendingMessageQueue_, for all individual messages inside this batch
-        // message. See 'ProducerImpl::sendAsync' for more details.
-        if (it->msg_.impl_->metadata.has_num_messages_in_batch()) {
-            // batch message - need to release more spots
-            // -1 since the pushing batch message into the queue already released a spot
-            pendingMessagesQueue_.release(it->msg_.impl_->metadata.num_messages_in_batch() - 1);
-        }
         messagesToFail.push_back(*it);
     }
 
@@ -427,7 +419,9 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
 
     if (batchMessageContainer && !msg.impl_->metadata.has_deliver_at_time()) {
         // Batching is enabled and the message is not delayed
-        batchMessageContainer->add(msg, cb);
+        if (!batchMessageContainer->add(msg, cb)) {
+            pendingMessagesQueue_.release(1);
+        }
         return;
     }
     sendMessage(msg, cb);
@@ -611,11 +605,6 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
     } else {
         LOG_DEBUG(getName() << "Remove corrupt message from queue " << sequenceId);
         pendingMessagesQueue_.pop();
-        if (op.msg_.impl_->metadata.has_num_messages_in_batch()) {
-            // batch message - need to release more spots
-            // -1 since the pushing batch message into the queue already released a spot
-            pendingMessagesQueue_.release(op.msg_.impl_->metadata.num_messages_in_batch() - 1);
-        }
         lock.unlock();
         if (op.sendCallback_) {
             // to protect from client callback exception
@@ -657,11 +646,6 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
         // Message was persisted correctly
         LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
         pendingMessagesQueue_.pop();
-        if (op.msg_.impl_->metadata.has_num_messages_in_batch()) {
-            // batch message - need to release more spots
-            // -1 since the pushing batch message into the queue already released a spot
-            pendingMessagesQueue_.release(op.msg_.impl_->metadata.num_messages_in_batch() - 1);
-        }
 
         lastSequenceIdPublished_ = sequenceId + op.msg_.impl_->metadata.num_messages_in_batch() - 1;
 
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index b27d0c6..e50b67b 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -620,6 +620,10 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
     result = producer.send(msg);
     ASSERT_EQ(ResultOk, result);
 
+    for (const auto &q : PulsarFriend::getProducerMessageQueue(producer, NonPartitioned)) {
+        ASSERT_EQ(0, q->reservedSpots());
+    }
+
     delete[] content;
 }
 
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc
index f9638f8..78ed248 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -919,6 +919,10 @@ TEST(BatchMessageTest, testPartitionedTopics) {
 
     // Number of messages consumed
     ASSERT_EQ(i, numOfMessages - globalPublishCountQueueFull);
+
+    for (const auto& q : PulsarFriend::getProducerMessageQueue(producer, Partitioned)) {
+        ASSERT_EQ(0, q->reservedSpots());
+    }
 }
 
 TEST(BatchMessageTest, producerFailureResult) {
@@ -993,10 +997,10 @@ TEST(BatchMessageTest, testSendCallback) {
     constexpr int numMessagesOfBatch = 3;
 
     ProducerConfiguration producerConfig;
-    producerConfig.setBatchingEnabled(5);
+    producerConfig.setBatchingEnabled(true);
     producerConfig.setBatchingMaxMessages(numMessagesOfBatch);
     producerConfig.setBatchingMaxPublishDelayMs(1000);  // 1 s, it's long enough for 3 messages batched
-    producerConfig.setMaxPendingMessages(numMessagesOfBatch);
+    producerConfig.setMaxPendingMessages(2);            // only 1 spot is actually used
 
     Producer producer;
     ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
@@ -1022,11 +1026,16 @@ TEST(BatchMessageTest, testSendCallback) {
         Message msg;
         ASSERT_EQ(ResultOk, consumer.receive(msg));
         receivedIdSet.emplace(msg.getMessageId());
+        consumer.acknowledge(msg);
     }
 
     latch.wait();
     ASSERT_EQ(sentIdSet, receivedIdSet);
 
+    for (const auto& q : PulsarFriend::getProducerMessageQueue(producer, NonPartitioned)) {
+        ASSERT_EQ(0, q->reservedSpots());
+    }
+
     consumer.close();
     producer.close();
     client.close();
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index a50bd67..ea38881 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -18,6 +18,7 @@
  */
 
 #include <lib/ProducerImpl.h>
+#include <lib/PartitionedProducerImpl.h>
 #include <lib/ConsumerImpl.h>
 #include <string>
 
@@ -37,6 +38,21 @@ class PulsarFriend {
         return std::static_pointer_cast<ProducerStatsImpl>(producerImpl->producerStatsBasePtr_);
     }
 
+    static std::vector<ProducerImpl::MessageQueue*> getProducerMessageQueue(Producer producer,
+                                                                            ConsumerTopicType type) {
+        ProducerImplBasePtr producerBaseImpl = producer.impl_;
+        if (type == Partitioned) {
+            std::vector<ProducerImpl::MessageQueue*> queues;
+            for (const auto& producer :
+                 std::static_pointer_cast<PartitionedProducerImpl>(producerBaseImpl)->producers_) {
+                queues.emplace_back(&producer->pendingMessagesQueue_);
+            }
+            return queues;
+        } else {
+            return {&std::static_pointer_cast<ProducerImpl>(producerBaseImpl)->pendingMessagesQueue_};
+        }
+    }
+
     template <typename T>
     static unsigned long sum(std::map<T, unsigned long> m) {
         unsigned long sum = 0;