You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:08 UTC
[pulsar] 22/38: [C++] Fix message id is always the default value in
send callback (#6812)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0f1636a1aaebed4ee4865d6271c5f29ab5a45c5d
Author: BewareMyPower <xy...@163.com>
AuthorDate: Sat Apr 25 14:57:26 2020 +0800
[C++] Fix message id is always the default value in send callback (#6812)
* Fix bug: sendCallback's 2nd argument was always the default MessageId
* Set batch index for each message's callback of batch
* Add test for message id in send callback
* Ensure all send callbacks completed before ASSERT_EQ
(cherry picked from commit fc69a628abb92e3b5ecd8e98b8b00cc3738f4603)
---
pulsar-client-cpp/lib/BatchMessageContainer.cc | 12 ++++---
pulsar-client-cpp/lib/BatchMessageContainer.h | 10 +++---
pulsar-client-cpp/lib/ProducerImpl.cc | 2 +-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 37 +++++++++++++++++++
pulsar-client-cpp/tests/BatchMessageTest.cc | 50 +++++++++++++++++++++++++-
5 files changed, 99 insertions(+), 12 deletions(-)
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index 9f904a2..7413d57 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -67,7 +67,7 @@ void BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, b
maxAllowedMessageBatchSizeInBytes_);
LOG_DEBUG(*this << " After serialization payload size in bytes = " << impl_->payload.readableBytes());
- messagesContainerListPtr_->push_back(MessageContainer(msg, sendCallback, msg.getMessageId()));
+ messagesContainerListPtr_->emplace_back(msg, sendCallback);
LOG_DEBUG(*this << " Number of messages in Batch = " << messagesContainerListPtr_->size());
LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_);
@@ -105,7 +105,7 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
if (impl_->payload.readableBytes() > producer_.keepMaxMessageSize_) {
// At this point the compressed batch is above the overall MaxMessageSize. There
// can only 1 single message in the batch at this point.
- batchMessageCallBack(ResultMessageTooBig, messagesContainerListPtr_, nullptr);
+ batchMessageCallBack(ResultMessageTooBig, MessageId{}, messagesContainerListPtr_, nullptr);
clear();
return;
}
@@ -115,7 +115,7 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
// bind keeps a copy of the parameters
SendCallback callback = std::bind(&BatchMessageContainer::batchMessageCallBack, std::placeholders::_1,
- messagesContainerListPtr_, flushCallback);
+ std::placeholders::_2, messagesContainerListPtr_, flushCallback);
producer_.sendMessage(msg, callback);
clear();
@@ -144,7 +144,8 @@ void BatchMessageContainer::clear() {
batchSizeInBytes_ = 0;
}
-void BatchMessageContainer::batchMessageCallBack(Result r, MessageContainerListPtr messagesContainerListPtr,
+void BatchMessageContainer::batchMessageCallBack(Result r, const MessageId& messageId,
+ MessageContainerListPtr messagesContainerListPtr,
FlushCallback flushCallback) {
if (!messagesContainerListPtr) {
if (flushCallback) {
@@ -156,7 +157,8 @@ void BatchMessageContainer::batchMessageCallBack(Result r, MessageContainerListP
<< r << "] [numOfMessages = " << messagesContainerListPtr->size() << "]");
size_t batch_size = messagesContainerListPtr->size();
for (size_t i = 0; i < batch_size; i++) {
- messagesContainerListPtr->operator[](i).callBack(r);
+ MessageId messageIdInBatch(messageId.partition(), messageId.ledgerId(), messageId.entryId(), i);
+ messagesContainerListPtr->operator[](i).callBack(r, messageIdInBatch);
}
if (flushCallback) {
flushCallback(ResultOk);
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.h b/pulsar-client-cpp/lib/BatchMessageContainer.h
index 424c7f4..93b4d81 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.h
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.h
@@ -45,12 +45,11 @@ namespace pulsar {
class BatchMessageContainer {
public:
struct MessageContainer {
- MessageContainer(Message message, SendCallback sendCallback, MessageId messageId)
- : message_(message), sendCallback_(sendCallback), messageId_(messageId) {}
+ MessageContainer(Message message, SendCallback sendCallback)
+ : message_(message), sendCallback_(sendCallback) {}
Message message_;
SendCallback sendCallback_;
- MessageId messageId_;
- void callBack(const pulsar::Result& r) { sendCallback_(r, messageId_); }
+ void callBack(Result r, const MessageId& messageId) { sendCallback_(r, messageId); }
};
typedef std::vector<MessageContainer> MessageContainerList;
typedef std::shared_ptr<MessageContainerList> MessageContainerListPtr;
@@ -65,7 +64,8 @@ class BatchMessageContainer {
void clear();
- static void batchMessageCallBack(Result r, MessageContainerListPtr messages, FlushCallback callback);
+ static void batchMessageCallBack(Result r, const MessageId& messageId, MessageContainerListPtr messages,
+ FlushCallback callback);
friend inline std::ostream& operator<<(std::ostream& os,
const BatchMessageContainer& batchMessageContainer);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index f38ed2a..0095dc8 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -259,7 +259,7 @@ void ProducerImpl::failPendingMessages(Result result) {
}
// this function can handle null pointer
- BatchMessageContainer::batchMessageCallBack(result, messageContainerListPtr, NULL);
+ BatchMessageContainer::batchMessageCallBack(result, MessageId{}, messageContainerListPtr, NULL);
}
void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 54077ea..99b7f87 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3179,3 +3179,40 @@ TEST(BasicEndToEndTest, testCumulativeAcknowledgeNotAllowed) {
}
client.shutdown();
}
+
+TEST(BasicEndToEndTest, testSendCallback) {
+ const std::string topicName = "persistent://public/default/BasicEndToEndTest-testSendCallback";
+
+ Client client(lookupUrl);
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", consumer));
+
+ Latch latch(100);
+ std::set<MessageId> sentIdSet;
+ for (int i = 0; i < 100; i++) {
+ const auto msg = MessageBuilder().setContent("a").build();
+ producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId &id) {
+ ASSERT_EQ(ResultOk, result);
+ sentIdSet.emplace(id);
+ latch.countdown();
+ });
+ }
+
+ std::set<MessageId> receivedIdSet;
+ for (int i = 0; i < 100; i++) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg));
+ receivedIdSet.emplace(msg.getMessageId());
+ }
+
+ latch.wait();
+ ASSERT_EQ(sentIdSet, receivedIdSet);
+
+ consumer.close();
+ producer.close();
+ client.close();
+}
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc
index 3fe46ed..f9638f8 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -25,6 +25,7 @@
#include <lib/Commands.h>
#include <lib/Future.h>
+#include <lib/Latch.h>
#include <lib/LogUtils.h>
#include <lib/TopicName.h>
#include <lib/Utils.h>
@@ -982,4 +983,51 @@ TEST(BatchMessageTest, testPraseMessageBatchEntry) {
ASSERT_EQ(message.getDataAsString(), expected.content);
ASSERT_EQ(message.getProperty(expected.propKey), expected.propValue);
}
-}
\ No newline at end of file
+}
+
+TEST(BatchMessageTest, testSendCallback) {
+ const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";
+
+ Client client(lookupUrl);
+
+ constexpr int numMessagesOfBatch = 3;
+
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(5);
+ producerConfig.setBatchingMaxMessages(numMessagesOfBatch);
+ producerConfig.setBatchingMaxPublishDelayMs(1000); // 1 s, it's long enough for 3 messages batched
+ producerConfig.setMaxPendingMessages(numMessagesOfBatch);
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", consumer));
+
+ Latch latch(numMessagesOfBatch);
+ std::set<MessageId> sentIdSet;
+ for (int i = 0; i < numMessagesOfBatch; i++) {
+ const auto msg = MessageBuilder().setContent("a").build();
+ producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId& id) {
+ ASSERT_EQ(ResultOk, result);
+ ASSERT_EQ(i, id.batchIndex());
+ sentIdSet.emplace(id);
+ LOG_INFO("id of batch " << i << ": " << id);
+ latch.countdown();
+ });
+ }
+
+ std::set<MessageId> receivedIdSet;
+ for (int i = 0; i < numMessagesOfBatch; i++) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg));
+ receivedIdSet.emplace(msg.getMessageId());
+ }
+
+ latch.wait();
+ ASSERT_EQ(sentIdSet, receivedIdSet);
+
+ consumer.close();
+ producer.close();
+ client.close();
+}