You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/27 05:37:32 UTC
[pulsar] 03/06: [C++] Fix send callback might not be invoked in key based batching (#14898)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f49047debfe5e04b4dacfd44b42e01b400e86936
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Mar 29 01:45:14 2022 +0800
[C++] Fix send callback might not be invoked in key based batching (#14898)
* [C++] Fix send callback might not be invoked in key based batching
### Motivation
When C++ client enables key based batching, there is a chance that the
send callback is not invoked. See
https://github.com/apache/pulsar/blob/32df93f693bfdf42953bd728a12ecdea1796dcc8/pulsar-client-cpp/lib/ProducerImpl.cc#L272-L275
If a batch container has multiple batches, only one batch could be
processed during `closeAsync`. Even worse, the semaphores of other
batches won't be released.
### Modifications
- Add a `clearPendingBatches` method to clear all pending batches and
process them. Then call this method in `closeAsync` and
`getPendingCallbacksWhenFailed`.
- Add a test `testCloseBeforeSend` to verify when a producer has
multiple pending batches, all callbacks can be invoked in
`closeAsync`.
* Add processAndClear() to batch message container
(cherry picked from commit f3295ff0b14526de27791493d4c45cf814ef3654)
---
pulsar-client-cpp/lib/BatchMessageContainerBase.h | 26 +++++++++++++
pulsar-client-cpp/lib/ProducerImpl.cc | 47 ++++++-----------------
pulsar-client-cpp/tests/KeyBasedBatchingTest.cc | 32 ++++++++++++++-
3 files changed, 69 insertions(+), 36 deletions(-)
diff --git a/pulsar-client-cpp/lib/BatchMessageContainerBase.h b/pulsar-client-cpp/lib/BatchMessageContainerBase.h
index 8a32d8e9dca..71eef5fab62 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainerBase.h
+++ b/pulsar-client-cpp/lib/BatchMessageContainerBase.h
@@ -112,6 +112,9 @@ class BatchMessageContainerBase : public boost::noncopyable {
bool hasEnoughSpace(const Message& msg) const noexcept;
bool isEmpty() const noexcept;
+ void processAndClear(std::function<void(Result, const OpSendMsg&)> opSendMsgCallback,
+ FlushCallback flushCallback);
+
protected:
// references to ProducerImpl's fields
const std::string& topicName_;
@@ -157,6 +160,29 @@ inline void BatchMessageContainerBase::resetStats() {
sizeInBytes_ = 0;
}
+inline void BatchMessageContainerBase::processAndClear(
+ std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, FlushCallback flushCallback) {
+ if (isEmpty()) {
+ if (flushCallback) {
+ flushCallback(ResultOk);
+ }
+ } else {
+ const auto numBatches = getNumBatches();
+ if (numBatches == 1) {
+ OpSendMsg opSendMsg;
+ Result result = createOpSendMsg(opSendMsg, flushCallback);
+ opSendMsgCallback(result, opSendMsg);
+ } else if (numBatches > 1) {
+ std::vector<OpSendMsg> opSendMsgs;
+ std::vector<Result> results = createOpSendMsgs(opSendMsgs, flushCallback);
+ for (size_t i = 0; i < results.size(); i++) {
+ opSendMsgCallback(results[i], opSendMsgs[i]);
+ }
+ } // else numBatches is 0, do nothing
+ }
+ clear();
+}
+
inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainerBase& container) {
container.serialize(os);
return os;
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index e9812d46054..e15d388ef64 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -268,13 +268,14 @@ std::shared_ptr<ProducerImpl::PendingCallbacks> ProducerImpl::getPendingCallback
}
if (batchMessageContainer_) {
- OpSendMsg opSendMsg;
- if (batchMessageContainer_->createOpSendMsg(opSendMsg) == ResultOk) {
- callbacks->opSendMsgs.emplace_back(opSendMsg);
- }
-
- releaseSemaphoreForSendOp(opSendMsg);
- batchMessageContainer_->clear();
+ batchMessageContainer_->processAndClear(
+ [this, &callbacks](Result result, const OpSendMsg& opSendMsg) {
+ if (result == ResultOk) {
+ callbacks->opSendMsgs.emplace_back(opSendMsg);
+ }
+ releaseSemaphoreForSendOp(opSendMsg);
+ },
+ nullptr);
}
pendingMessagesQueue_.clear();
@@ -507,15 +508,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall
LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
batchTimer_->cancel();
- if (PULSAR_UNLIKELY(batchMessageContainer_->isEmpty())) {
- if (flushCallback) {
- flushCallback(ResultOk);
- }
- } else {
- const size_t numBatches = batchMessageContainer_->getNumBatches();
- if (numBatches == 1) {
- OpSendMsg opSendMsg;
- Result result = batchMessageContainer_->createOpSendMsg(opSendMsg, flushCallback);
+ batchMessageContainer_->processAndClear(
+ [this, &failures](Result result, const OpSendMsg& opSendMsg) {
if (result == ResultOk) {
sendMessage(opSendMsg);
} else {
@@ -525,25 +519,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall
releaseSemaphoreForSendOp(opSendMsg);
failures.add(std::bind(opSendMsg.sendCallback_, result, MessageId{}));
}
- } else if (numBatches > 1) {
- std::vector<OpSendMsg> opSendMsgs;
- std::vector<Result> results = batchMessageContainer_->createOpSendMsgs(opSendMsgs, flushCallback);
- for (size_t i = 0; i < results.size(); i++) {
- if (results[i] == ResultOk) {
- sendMessage(opSendMsgs[i]);
- } else {
- // A spot has been reserved for this batch, but the batch failed to be pushed to the
- // queue, so we need to release the spot manually
- LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsgs[" << i
- << "]: " << results[i]);
- releaseSemaphoreForSendOp(opSendMsgs[i]);
- failures.add(std::bind(opSendMsgs[i].sendCallback_, results[i], MessageId{}));
- }
- }
- } // else numBatches is 0, do nothing
- }
-
- batchMessageContainer_->clear();
+ },
+ flushCallback);
return failures;
}
diff --git a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
index 7b39554806b..d22152f4d81 100644
--- a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
+++ b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
@@ -41,7 +41,6 @@ class KeyBasedBatchingTest : public ::testing::Test {
void TearDown() override { client_.close(); }
- void setTopicName(const std::string& topicName) { topicName_ = topicName; }
void initTopicName(const std::string& testName) {
topicName_ = "KeyBasedBatchingTest-" + testName + "-" + std::to_string(time(nullptr));
}
@@ -178,3 +177,34 @@ TEST_F(KeyBasedBatchingTest, testSingleBatch) {
ASSERT_EQ(ResultTimeout, consumer_.receive(msg, 3000));
ASSERT_EQ(numMessageSent.load(), numMessages);
}
+
+TEST_F(KeyBasedBatchingTest, testCloseBeforeSend) {
+ initTopicName("CloseBeforeSend");
+ // Any asynchronous send won't be completed unless `close()` or `flush()` is triggered
+ initProducer(createDefaultProducerConfig().setBatchingMaxMessages(static_cast<unsigned>(-1)));
+
+ std::mutex mtx;
+ std::vector<Result> results;
+ auto saveResult = [&mtx, &results](Result result) {
+ std::lock_guard<std::mutex> lock(mtx);
+ results.emplace_back(result);
+ };
+ auto sendAsync = [saveResult, this](const std::string& key, const std::string& value) {
+ producer_.sendAsync(MessageBuilder().setOrderingKey(key).setContent(value).build(),
+ [saveResult](Result result, const MessageId& id) { saveResult(result); });
+ };
+
+ constexpr int numKeys = 10;
+ for (int i = 0; i < numKeys; i++) {
+ sendAsync("key-" + std::to_string(i), "value");
+ }
+
+ ASSERT_EQ(ResultOk, producer_.close());
+
+ // After close() completed, all callbacks should have failed with ResultAlreadyClosed
+ std::lock_guard<std::mutex> lock(mtx);
+ ASSERT_EQ(results.size(), numKeys);
+ for (int i = 0; i < numKeys; i++) {
+ ASSERT_EQ(results[i], ResultAlreadyClosed) << " results[" << i << "] is " << results[i];
+ }
+}