You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:08 UTC

[pulsar] 22/38: [C++] Fix message id is always the default value in send callback (#6812)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0f1636a1aaebed4ee4865d6271c5f29ab5a45c5d
Author: BewareMyPower <xy...@163.com>
AuthorDate: Sat Apr 25 14:57:26 2020 +0800

    [C++] Fix message id is always the default value in send callback (#6812)
    
    * Fix bug: sendCallback's 2nd argument was always the default MessageId
    
    * Set batch index for each message's callback of batch
    
    * Add test for message id in send callback
    
    * Ensure all send callbacks completed before ASSERT_EQ
    (cherry picked from commit fc69a628abb92e3b5ecd8e98b8b00cc3738f4603)
---
 pulsar-client-cpp/lib/BatchMessageContainer.cc | 12 ++++---
 pulsar-client-cpp/lib/BatchMessageContainer.h  | 10 +++---
 pulsar-client-cpp/lib/ProducerImpl.cc          |  2 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc   | 37 +++++++++++++++++++
 pulsar-client-cpp/tests/BatchMessageTest.cc    | 50 +++++++++++++++++++++++++-
 5 files changed, 99 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index 9f904a2..7413d57 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -67,7 +67,7 @@ void BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, b
                                                        maxAllowedMessageBatchSizeInBytes_);
     LOG_DEBUG(*this << " After serialization payload size in bytes = " << impl_->payload.readableBytes());
 
-    messagesContainerListPtr_->push_back(MessageContainer(msg, sendCallback, msg.getMessageId()));
+    messagesContainerListPtr_->emplace_back(msg, sendCallback);
 
     LOG_DEBUG(*this << " Number of messages in Batch = " << messagesContainerListPtr_->size());
     LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_);
@@ -105,7 +105,7 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
     if (impl_->payload.readableBytes() > producer_.keepMaxMessageSize_) {
         // At this point the compressed batch is above the overall MaxMessageSize. There
         // can only 1 single message in the batch at this point.
-        batchMessageCallBack(ResultMessageTooBig, messagesContainerListPtr_, nullptr);
+        batchMessageCallBack(ResultMessageTooBig, MessageId{}, messagesContainerListPtr_, nullptr);
         clear();
         return;
     }
@@ -115,7 +115,7 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
 
     // bind keeps a copy of the parameters
     SendCallback callback = std::bind(&BatchMessageContainer::batchMessageCallBack, std::placeholders::_1,
-                                      messagesContainerListPtr_, flushCallback);
+                                      std::placeholders::_2, messagesContainerListPtr_, flushCallback);
 
     producer_.sendMessage(msg, callback);
     clear();
@@ -144,7 +144,8 @@ void BatchMessageContainer::clear() {
     batchSizeInBytes_ = 0;
 }
 
-void BatchMessageContainer::batchMessageCallBack(Result r, MessageContainerListPtr messagesContainerListPtr,
+void BatchMessageContainer::batchMessageCallBack(Result r, const MessageId& messageId,
+                                                 MessageContainerListPtr messagesContainerListPtr,
                                                  FlushCallback flushCallback) {
     if (!messagesContainerListPtr) {
         if (flushCallback) {
@@ -156,7 +157,8 @@ void BatchMessageContainer::batchMessageCallBack(Result r, MessageContainerListP
               << r << "] [numOfMessages = " << messagesContainerListPtr->size() << "]");
     size_t batch_size = messagesContainerListPtr->size();
     for (size_t i = 0; i < batch_size; i++) {
-        messagesContainerListPtr->operator[](i).callBack(r);
+        MessageId messageIdInBatch(messageId.partition(), messageId.ledgerId(), messageId.entryId(), i);
+        messagesContainerListPtr->operator[](i).callBack(r, messageIdInBatch);
     }
     if (flushCallback) {
         flushCallback(ResultOk);
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.h b/pulsar-client-cpp/lib/BatchMessageContainer.h
index 424c7f4..93b4d81 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.h
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.h
@@ -45,12 +45,11 @@ namespace pulsar {
 class BatchMessageContainer {
    public:
     struct MessageContainer {
-        MessageContainer(Message message, SendCallback sendCallback, MessageId messageId)
-            : message_(message), sendCallback_(sendCallback), messageId_(messageId) {}
+        MessageContainer(Message message, SendCallback sendCallback)
+            : message_(message), sendCallback_(sendCallback) {}
         Message message_;
         SendCallback sendCallback_;
-        MessageId messageId_;
-        void callBack(const pulsar::Result& r) { sendCallback_(r, messageId_); }
+        void callBack(Result r, const MessageId& messageId) { sendCallback_(r, messageId); }
     };
     typedef std::vector<MessageContainer> MessageContainerList;
     typedef std::shared_ptr<MessageContainerList> MessageContainerListPtr;
@@ -65,7 +64,8 @@ class BatchMessageContainer {
 
     void clear();
 
-    static void batchMessageCallBack(Result r, MessageContainerListPtr messages, FlushCallback callback);
+    static void batchMessageCallBack(Result r, const MessageId& messageId, MessageContainerListPtr messages,
+                                     FlushCallback callback);
 
     friend inline std::ostream& operator<<(std::ostream& os,
                                            const BatchMessageContainer& batchMessageContainer);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index f38ed2a..0095dc8 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -259,7 +259,7 @@ void ProducerImpl::failPendingMessages(Result result) {
     }
 
     // this function can handle null pointer
-    BatchMessageContainer::batchMessageCallBack(result, messageContainerListPtr, NULL);
+    BatchMessageContainer::batchMessageCallBack(result, MessageId{}, messageContainerListPtr, NULL);
 }
 
 void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 54077ea..99b7f87 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3179,3 +3179,40 @@ TEST(BasicEndToEndTest, testCumulativeAcknowledgeNotAllowed) {
     }
     client.shutdown();
 }
+
+TEST(BasicEndToEndTest, testSendCallback) {
+    const std::string topicName = "persistent://public/default/BasicEndToEndTest-testSendCallback";
+
+    Client client(lookupUrl);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", consumer));
+
+    Latch latch(100);
+    std::set<MessageId> sentIdSet;
+    for (int i = 0; i < 100; i++) {
+        const auto msg = MessageBuilder().setContent("a").build();
+        producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId &id) {
+            ASSERT_EQ(ResultOk, result);
+            sentIdSet.emplace(id);
+            latch.countdown();
+        });
+    }
+
+    std::set<MessageId> receivedIdSet;
+    for (int i = 0; i < 100; i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        receivedIdSet.emplace(msg.getMessageId());
+    }
+
+    latch.wait();
+    ASSERT_EQ(sentIdSet, receivedIdSet);
+
+    consumer.close();
+    producer.close();
+    client.close();
+}
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc
index 3fe46ed..f9638f8 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -25,6 +25,7 @@
 
 #include <lib/Commands.h>
 #include <lib/Future.h>
+#include <lib/Latch.h>
 #include <lib/LogUtils.h>
 #include <lib/TopicName.h>
 #include <lib/Utils.h>
@@ -982,4 +983,51 @@ TEST(BatchMessageTest, testPraseMessageBatchEntry) {
         ASSERT_EQ(message.getDataAsString(), expected.content);
         ASSERT_EQ(message.getProperty(expected.propKey), expected.propValue);
     }
-}
\ No newline at end of file
+}
+
+TEST(BatchMessageTest, testSendCallback) {
+    const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";
+
+    Client client(lookupUrl);
+
+    constexpr int numMessagesOfBatch = 3;
+
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(5);
+    producerConfig.setBatchingMaxMessages(numMessagesOfBatch);
+    producerConfig.setBatchingMaxPublishDelayMs(1000);  // 1 s, it's long enough for 3 messages batched
+    producerConfig.setMaxPendingMessages(numMessagesOfBatch);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", consumer));
+
+    Latch latch(numMessagesOfBatch);
+    std::set<MessageId> sentIdSet;
+    for (int i = 0; i < numMessagesOfBatch; i++) {
+        const auto msg = MessageBuilder().setContent("a").build();
+        producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId& id) {
+            ASSERT_EQ(ResultOk, result);
+            ASSERT_EQ(i, id.batchIndex());
+            sentIdSet.emplace(id);
+            LOG_INFO("id of batch " << i << ": " << id);
+            latch.countdown();
+        });
+    }
+
+    std::set<MessageId> receivedIdSet;
+    for (int i = 0; i < numMessagesOfBatch; i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        receivedIdSet.emplace(msg.getMessageId());
+    }
+
+    latch.wait();
+    ASSERT_EQ(sentIdSet, receivedIdSet);
+
+    consumer.close();
+    producer.close();
+    client.close();
+}