You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/04/24 02:57:47 UTC

[GitHub] [pulsar] BewareMyPower opened a new pull request #6812: [C++] Fix message id is always the default value in send callback

BewareMyPower opened a new pull request #6812:
URL: https://github.com/apache/pulsar/pull/6812


   ### Motivation
   
   After commit of [#4817](https://github.com/apache/pulsar/pull/4811), the send callback's 2nd argument became `MessageId`, but the `MessageId` in callback is always the default value `(-1, -1, -1, -1)`. We need the message id in send callback if messages were sent successfully.
   
   The problem is that the correct message id has been retrieved in `ProducerImpl::ackReceived` but not passed to the user provided callback. Because after messages were sent to `BatchMessageContainer`, the wrapper of user provided callback used the `MessageId` of user constructed `Message` as the 2nd argument, which is always `(-1, -1, -1, -1)`.
   
   ### Modifications
   
   - Remove useless field `messageId` of `BatchMessageContainer::MessageContainer`. Then add `const MessageId&` argument to `batchMessageCallBack` instead.
   - Add tests for message id in send callback. Specially, for a batched message, each internal message's batch index was verified.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
   Run `BasicEndToEndTest.testSendCallback` and `BatchMessageTest.testSendCallback`.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #6812: [C++] Fix message id is always the default value in send callback

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #6812:
URL: https://github.com/apache/pulsar/pull/6812#discussion_r414720947



##########
File path: pulsar-client-cpp/tests/BatchMessageTest.cc
##########
@@ -982,4 +982,48 @@ TEST(BatchMessageTest, testPraseMessageBatchEntry) {
         ASSERT_EQ(message.getDataAsString(), expected.content);
         ASSERT_EQ(message.getProperty(expected.propKey), expected.propValue);
     }
-}
\ No newline at end of file
+}
+
+TEST(BatchMessageTest, testSendCallback) {
+    const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";
+
+    Client client(lookupUrl);
+
+    constexpr int numMessagesOfBatch = 3;
+
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(5);
+    producerConfig.setBatchingMaxMessages(numMessagesOfBatch);
+    producerConfig.setBatchingMaxPublishDelayMs(1000);  // 1 s, it's long enough for 3 messages batched
+    producerConfig.setMaxPendingMessages(numMessagesOfBatch);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", consumer));
+
+    std::set<MessageId> sentIdSet;
+    for (int i = 0; i < numMessagesOfBatch; i++) {
+        const auto msg = MessageBuilder().setContent("a").build();
+        producer.sendAsync(msg, [&sentIdSet, i](Result result, const MessageId& id) {
+            ASSERT_EQ(ResultOk, result);
+            ASSERT_EQ(i, id.batchIndex());
+            sentIdSet.emplace(id);

Review comment:
       Thanks! I found the `Latch` just now :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #6812: [C++] Fix message id is always the default value in send callback

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #6812:
URL: https://github.com/apache/pulsar/pull/6812#discussion_r414700129



##########
File path: pulsar-client-cpp/tests/BatchMessageTest.cc
##########
@@ -982,4 +982,48 @@ TEST(BatchMessageTest, testPraseMessageBatchEntry) {
         ASSERT_EQ(message.getDataAsString(), expected.content);
         ASSERT_EQ(message.getProperty(expected.propKey), expected.propValue);
     }
-}
\ No newline at end of file
+}
+
+TEST(BatchMessageTest, testSendCallback) {
+    const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";
+
+    Client client(lookupUrl);
+
+    constexpr int numMessagesOfBatch = 3;
+
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(5);
+    producerConfig.setBatchingMaxMessages(numMessagesOfBatch);
+    producerConfig.setBatchingMaxPublishDelayMs(1000);  // 1 s, it's long enough for 3 messages batched
+    producerConfig.setMaxPendingMessages(numMessagesOfBatch);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", consumer));
+
+    std::set<MessageId> sentIdSet;
+    for (int i = 0; i < numMessagesOfBatch; i++) {
+        const auto msg = MessageBuilder().setContent("a").build();
+        producer.sendAsync(msg, [&sentIdSet, i](Result result, const MessageId& id) {
+            ASSERT_EQ(ResultOk, result);
+            ASSERT_EQ(i, id.batchIndex());
+            sentIdSet.emplace(id);

Review comment:
       Thank you for pointing out this! I missed the point that the `SendReceipt` to producer is not guaranteed to be completed before the consumer's `receive()` done. I'll do some work on it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #6812: [C++] Fix message id is always the default value in send callback

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #6812:
URL: https://github.com/apache/pulsar/pull/6812#issuecomment-619302707


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #6812: [C++] Fix message id is always the default value in send callback

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #6812:
URL: https://github.com/apache/pulsar/pull/6812#discussion_r414282182



##########
File path: pulsar-client-cpp/tests/BatchMessageTest.cc
##########
@@ -982,4 +982,48 @@ TEST(BatchMessageTest, testPraseMessageBatchEntry) {
         ASSERT_EQ(message.getDataAsString(), expected.content);
         ASSERT_EQ(message.getProperty(expected.propKey), expected.propValue);
     }
-}
\ No newline at end of file
+}
+
+TEST(BatchMessageTest, testSendCallback) {
+    const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";
+
+    Client client(lookupUrl);
+
+    constexpr int numMessagesOfBatch = 3;
+
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(5);
+    producerConfig.setBatchingMaxMessages(numMessagesOfBatch);
+    producerConfig.setBatchingMaxPublishDelayMs(1000);  // 1 s, it's long enough for 3 messages batched
+    producerConfig.setMaxPendingMessages(numMessagesOfBatch);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", consumer));
+
+    std::set<MessageId> sentIdSet;
+    for (int i = 0; i < numMessagesOfBatch; i++) {
+        const auto msg = MessageBuilder().setContent("a").build();
+        producer.sendAsync(msg, [&sentIdSet, i](Result result, const MessageId& id) {
+            ASSERT_EQ(ResultOk, result);
+            ASSERT_EQ(i, id.batchIndex());
+            sentIdSet.emplace(id);

Review comment:
       There's no hard guarantee that this callback will be executed before the receive loop is done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #6812: [C++] Fix message id is always the default value in send callback

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #6812:
URL: https://github.com/apache/pulsar/pull/6812#discussion_r414712921



##########
File path: pulsar-client-cpp/tests/BatchMessageTest.cc
##########
@@ -982,4 +982,48 @@ TEST(BatchMessageTest, testPraseMessageBatchEntry) {
         ASSERT_EQ(message.getDataAsString(), expected.content);
         ASSERT_EQ(message.getProperty(expected.propKey), expected.propValue);
     }
-}
\ No newline at end of file
+}
+
+TEST(BatchMessageTest, testSendCallback) {
+    const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";
+
+    Client client(lookupUrl);
+
+    constexpr int numMessagesOfBatch = 3;
+
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(5);
+    producerConfig.setBatchingMaxMessages(numMessagesOfBatch);
+    producerConfig.setBatchingMaxPublishDelayMs(1000);  // 1 s, it's long enough for 3 messages batched
+    producerConfig.setMaxPendingMessages(numMessagesOfBatch);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", consumer));
+
+    std::set<MessageId> sentIdSet;
+    for (int i = 0; i < numMessagesOfBatch; i++) {
+        const auto msg = MessageBuilder().setContent("a").build();
+        producer.sendAsync(msg, [&sentIdSet, i](Result result, const MessageId& id) {
+            ASSERT_EQ(ResultOk, result);
+            ASSERT_EQ(i, id.batchIndex());
+            sentIdSet.emplace(id);

Review comment:
       There is a `Latch` class in `lib/` that might be helpful here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #6812: [C++] Fix message id is always the default value in send callback

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #6812:
URL: https://github.com/apache/pulsar/pull/6812#issuecomment-618779875


   /pulsarbot run cpp-tests


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #6812: [C++] Fix message id is always the default value in send callback

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #6812:
URL: https://github.com/apache/pulsar/pull/6812#issuecomment-619310710


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on pull request #6812: [C++] Fix message id is always the default value in send callback

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #6812:
URL: https://github.com/apache/pulsar/pull/6812#issuecomment-618793369


   /pulsarbot run unit-tests


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org