You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/27 05:37:32 UTC

[pulsar] 03/06: [C++] Fix send callback might not be invoked in key based batching (#14898)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f49047debfe5e04b4dacfd44b42e01b400e86936
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Mar 29 01:45:14 2022 +0800

    [C++] Fix send callback might not be invoked in key based batching (#14898)
    
    * [C++] Fix send callback might not be invoked in key based batching
    
    ### Motivation
    
    When C++ client enables key based batching, there is a chance that the
    send callback is not invoked. See
    https://github.com/apache/pulsar/blob/32df93f693bfdf42953bd728a12ecdea1796dcc8/pulsar-client-cpp/lib/ProducerImpl.cc#L272-L275
    
    If a batch container has multiple batches, only one batch could be
    processed during `closeAsync`. Even worse, the semaphores of other
    batches won't be released.
    
    ### Modifications
    
    - Add a `clearPendingBatches` method to clear all pending batches and
      process them. Then call this method in `closeAsync` and
      `getPendingCallbacksWhenFailed`.
    - Add a test `testCloseBeforeSend` to verify when a producer has
      multiple pending batches, all callbacks can be invoked in
      `closeAsync`.
    
    * Add processAndClear() to batch message container
    
    (cherry picked from commit f3295ff0b14526de27791493d4c45cf814ef3654)
---
 pulsar-client-cpp/lib/BatchMessageContainerBase.h | 26 +++++++++++++
 pulsar-client-cpp/lib/ProducerImpl.cc             | 47 ++++++-----------------
 pulsar-client-cpp/tests/KeyBasedBatchingTest.cc   | 32 ++++++++++++++-
 3 files changed, 69 insertions(+), 36 deletions(-)

diff --git a/pulsar-client-cpp/lib/BatchMessageContainerBase.h b/pulsar-client-cpp/lib/BatchMessageContainerBase.h
index 8a32d8e9dca..71eef5fab62 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainerBase.h
+++ b/pulsar-client-cpp/lib/BatchMessageContainerBase.h
@@ -112,6 +112,9 @@ class BatchMessageContainerBase : public boost::noncopyable {
     bool hasEnoughSpace(const Message& msg) const noexcept;
     bool isEmpty() const noexcept;
 
+    void processAndClear(std::function<void(Result, const OpSendMsg&)> opSendMsgCallback,
+                         FlushCallback flushCallback);
+
    protected:
     // references to ProducerImpl's fields
     const std::string& topicName_;
@@ -157,6 +160,29 @@ inline void BatchMessageContainerBase::resetStats() {
     sizeInBytes_ = 0;
 }
 
+inline void BatchMessageContainerBase::processAndClear(
+    std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, FlushCallback flushCallback) {
+    if (isEmpty()) {
+        if (flushCallback) {
+            flushCallback(ResultOk);
+        }
+    } else {
+        const auto numBatches = getNumBatches();
+        if (numBatches == 1) {
+            OpSendMsg opSendMsg;
+            Result result = createOpSendMsg(opSendMsg, flushCallback);
+            opSendMsgCallback(result, opSendMsg);
+        } else if (numBatches > 1) {
+            std::vector<OpSendMsg> opSendMsgs;
+            std::vector<Result> results = createOpSendMsgs(opSendMsgs, flushCallback);
+            for (size_t i = 0; i < results.size(); i++) {
+                opSendMsgCallback(results[i], opSendMsgs[i]);
+            }
+        }  // else numBatches is 0, do nothing
+    }
+    clear();
+}
+
 inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainerBase& container) {
     container.serialize(os);
     return os;
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index e9812d46054..e15d388ef64 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -268,13 +268,14 @@ std::shared_ptr<ProducerImpl::PendingCallbacks> ProducerImpl::getPendingCallback
     }
 
     if (batchMessageContainer_) {
-        OpSendMsg opSendMsg;
-        if (batchMessageContainer_->createOpSendMsg(opSendMsg) == ResultOk) {
-            callbacks->opSendMsgs.emplace_back(opSendMsg);
-        }
-
-        releaseSemaphoreForSendOp(opSendMsg);
-        batchMessageContainer_->clear();
+        batchMessageContainer_->processAndClear(
+            [this, &callbacks](Result result, const OpSendMsg& opSendMsg) {
+                if (result == ResultOk) {
+                    callbacks->opSendMsgs.emplace_back(opSendMsg);
+                }
+                releaseSemaphoreForSendOp(opSendMsg);
+            },
+            nullptr);
     }
     pendingMessagesQueue_.clear();
 
@@ -507,15 +508,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall
     LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
     batchTimer_->cancel();
 
-    if (PULSAR_UNLIKELY(batchMessageContainer_->isEmpty())) {
-        if (flushCallback) {
-            flushCallback(ResultOk);
-        }
-    } else {
-        const size_t numBatches = batchMessageContainer_->getNumBatches();
-        if (numBatches == 1) {
-            OpSendMsg opSendMsg;
-            Result result = batchMessageContainer_->createOpSendMsg(opSendMsg, flushCallback);
+    batchMessageContainer_->processAndClear(
+        [this, &failures](Result result, const OpSendMsg& opSendMsg) {
             if (result == ResultOk) {
                 sendMessage(opSendMsg);
             } else {
@@ -525,25 +519,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall
                 releaseSemaphoreForSendOp(opSendMsg);
                 failures.add(std::bind(opSendMsg.sendCallback_, result, MessageId{}));
             }
-        } else if (numBatches > 1) {
-            std::vector<OpSendMsg> opSendMsgs;
-            std::vector<Result> results = batchMessageContainer_->createOpSendMsgs(opSendMsgs, flushCallback);
-            for (size_t i = 0; i < results.size(); i++) {
-                if (results[i] == ResultOk) {
-                    sendMessage(opSendMsgs[i]);
-                } else {
-                    // A spot has been reserved for this batch, but the batch failed to be pushed to the
-                    // queue, so we need to release the spot manually
-                    LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsgs[" << i
-                                                                                  << "]: " << results[i]);
-                    releaseSemaphoreForSendOp(opSendMsgs[i]);
-                    failures.add(std::bind(opSendMsgs[i].sendCallback_, results[i], MessageId{}));
-                }
-            }
-        }  // else numBatches is 0, do nothing
-    }
-
-    batchMessageContainer_->clear();
+        },
+        flushCallback);
     return failures;
 }
 
diff --git a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
index 7b39554806b..d22152f4d81 100644
--- a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
+++ b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
@@ -41,7 +41,6 @@ class KeyBasedBatchingTest : public ::testing::Test {
 
     void TearDown() override { client_.close(); }
 
-    void setTopicName(const std::string& topicName) { topicName_ = topicName; }
     void initTopicName(const std::string& testName) {
         topicName_ = "KeyBasedBatchingTest-" + testName + "-" + std::to_string(time(nullptr));
     }
@@ -178,3 +177,34 @@ TEST_F(KeyBasedBatchingTest, testSingleBatch) {
     ASSERT_EQ(ResultTimeout, consumer_.receive(msg, 3000));
     ASSERT_EQ(numMessageSent.load(), numMessages);
 }
+
+TEST_F(KeyBasedBatchingTest, testCloseBeforeSend) {
+    initTopicName("CloseBeforeSend");
+    // Any asynchronous send won't be completed unless `close()` or `flush()` is triggered
+    initProducer(createDefaultProducerConfig().setBatchingMaxMessages(static_cast<unsigned>(-1)));
+
+    std::mutex mtx;
+    std::vector<Result> results;
+    auto saveResult = [&mtx, &results](Result result) {
+        std::lock_guard<std::mutex> lock(mtx);
+        results.emplace_back(result);
+    };
+    auto sendAsync = [saveResult, this](const std::string& key, const std::string& value) {
+        producer_.sendAsync(MessageBuilder().setOrderingKey(key).setContent(value).build(),
+                            [saveResult](Result result, const MessageId& id) { saveResult(result); });
+    };
+
+    constexpr int numKeys = 10;
+    for (int i = 0; i < numKeys; i++) {
+        sendAsync("key-" + std::to_string(i), "value");
+    }
+
+    ASSERT_EQ(ResultOk, producer_.close());
+
+    // After close() completed, all callbacks should have failed with ResultAlreadyClosed
+    std::lock_guard<std::mutex> lock(mtx);
+    ASSERT_EQ(results.size(), numKeys);
+    for (int i = 0; i < numKeys; i++) {
+        ASSERT_EQ(results[i], ResultAlreadyClosed) << " results[" << i << "] is " << results[i];
+    }
+}