You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 13:45:01 UTC

[pulsar] 10/17: Fix message id error if messages were sent to a partitioned topic (#6938)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3421e75e0035063a8eeb2c927f3f23da64c05dff
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue May 12 12:12:43 2020 +0800

    Fix message id error if messages were sent to a partitioned topic (#6938)
    
    ### Motivation
    
    If messages were sent to a partitioned topic, the message id's `partition` field was always -1 because SendReceipt command only contains ledger id and entry id.
    
    ### Modifications
    
    - Add a `partition` field to `ProducerImpl` and set the `MessageId`'s `partition` field with it in `ackReceived` method later.
    - Add a test to check message id in send callback if messages were sent to a partitioned topic.
    
    (cherry picked from commit 15cb920b394874d37039df5e7665092651c28fae)
---
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc |  2 +-
 pulsar-client-cpp/lib/ProducerImpl.cc            |  8 +++-
 pulsar-client-cpp/lib/ProducerImpl.h             |  3 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 54 ++++++++++++++++++++++++
 4 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 0461ee3..628afbc 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -90,7 +90,7 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
 ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
     using namespace std::placeholders;
     std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
-    auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
+    auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
     producer->getProducerCreatedFuture().addListener(
         std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
                   const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 0095dc8..a2547cd 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -39,13 +39,15 @@ OpSendMsg::OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& ms
       sequenceId_(sequenceId),
       timeout_(TimeUtils::now() + milliseconds(conf.getSendTimeout())) {}
 
-ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf)
+ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf,
+                           int32_t partition)
     : HandlerBase(
           client, topic,
           Backoff(milliseconds(100), seconds(60), milliseconds(std::max(100, conf.getSendTimeout() - 100)))),
       conf_(conf),
       executor_(client->getIOExecutorProvider()->get()),
       pendingMessagesQueue_(conf_.getMaxPendingMessages()),
+      partition_(partition),
       producerName_(conf_.getProducerName()),
       producerStr_("[" + topic_ + ", " + producerName_ + "] "),
       producerId_(client->newProducerId()),
@@ -627,7 +629,9 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
     }
 }
 
-bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& messageId) {
+bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
+    MessageId messageId(partition_, rawMessageId.ledgerId(), rawMessageId.entryId(),
+                        rawMessageId.batchIndex());
     OpSendMsg op;
     Lock lock(mutex_);
     bool havePendingAck = pendingMessagesQueue_.peek(op);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 80927b1..e4f35d4 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -63,7 +63,7 @@ class ProducerImpl : public HandlerBase,
                      public ProducerImplBase {
    public:
     ProducerImpl(ClientImplPtr client, const std::string& topic,
-                 const ProducerConfiguration& producerConfiguration);
+                 const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
     ~ProducerImpl();
 
     int keepMaxMessageSize_;
@@ -150,6 +150,7 @@ class ProducerImpl : public HandlerBase,
 
     MessageQueue pendingMessagesQueue_;
 
+    int32_t partition_;  // -1 if topic is non-partitioned
     std::string producerName_;
     std::string producerStr_;
     uint64_t producerId_;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 99b7f87..5277ca4 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3207,11 +3207,65 @@ TEST(BasicEndToEndTest, testSendCallback) {
         Message msg;
         ASSERT_EQ(ResultOk, consumer.receive(msg));
         receivedIdSet.emplace(msg.getMessageId());
+        consumer.acknowledge(msg);
+    }
+
+    latch.wait();
+    ASSERT_EQ(sentIdSet, receivedIdSet);
+
+    consumer.close();
+    producer.close();
+
+    const std::string partitionedTopicName = topicName + "-" + std::to_string(time(nullptr));
+    const std::string url = adminUrl + "admin/v2/persistent/" +
+                            partitionedTopicName.substr(partitionedTopicName.find("://") + 3) + "/partitions";
+    const int numPartitions = 3;
+
+    int res = makePutRequest(url, std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(false);
+    producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopicName, producerConfig, producer));
+    ASSERT_EQ(ResultOk, client.subscribe(partitionedTopicName, "SubscriptionName", consumer));
+
+    sentIdSet.clear();
+    receivedIdSet.clear();
+
+    const int numMessages = numPartitions * 2;
+    latch = Latch(numMessages);
+    for (int i = 0; i < numMessages; i++) {
+        const auto msg = MessageBuilder().setContent("a").build();
+        producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId &id) {
+            ASSERT_EQ(ResultOk, result);
+            sentIdSet.emplace(id);
+            latch.countdown();
+        });
+    }
+
+    for (int i = 0; i < numMessages; i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        receivedIdSet.emplace(msg.getMessageId());
+        consumer.acknowledge(msg);
     }
 
     latch.wait();
     ASSERT_EQ(sentIdSet, receivedIdSet);
 
+    std::set<int> partitionIndexSet;
+    for (const auto &id : sentIdSet) {
+        partitionIndexSet.emplace(id.partition());
+    }
+    std::set<int> expectedPartitionIndexSet;
+    for (int i = 0; i < numPartitions; i++) {
+        expectedPartitionIndexSet.emplace(i);
+    }
+    ASSERT_EQ(sentIdSet, receivedIdSet);
+
     consumer.close();
     producer.close();
     client.close();