You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/11/17 06:28:05 UTC

(pulsar-client-cpp) 03/04: Fix lazy partitioned producer might send duplicated messages (#342)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 8408e657bf0ae1f34489da25e9aa3de4688bfd6f
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Nov 13 10:02:01 2023 +0800

    Fix lazy partitioned producer might send duplicated messages (#342)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/341
    
    ### Motivation
    
    When a lazy partitioned producer sends two messages, the flow is:
    1. `start` is called to grab the connection via `grab()`.
    2. Generate 0 as the sequence id of the 1st message.
    3. Add the 1st message into the queuea.
    4. The connection is established, `msgSequenceGenerator_` is reset from
       1 to 0.
    5. When sending the 2nd message, 0 is also generated as the sequence id.
    
    Then two messages have the same sequence id.
    
    ### Modifications
    
    For lazy partitioned producers, if the internal producer is not started,
    sending the message in the callback of its future.
    
    Add `ChunkDedupTest#testLazyPartitionedProducer` to verify it since only
    the `tests/chunkdedup/docker-compose.yml` enables the deduplication.
    
    (cherry picked from commit bb16f24bb68699c15eddba3163cfbef0b1282ebf)
---
 lib/PartitionedProducerImpl.cc     | 22 +++++++++++++++++++---
 lib/ProducerImpl.cc                |  8 ++------
 lib/ProducerImpl.h                 |  7 +++++--
 tests/CMakeLists.txt               |  2 +-
 tests/chunkdedup/ChunkDedupTest.cc | 26 ++++++++++++++++++++++++++
 5 files changed, 53 insertions(+), 12 deletions(-)

diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index f7957d6..a799d5c 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -198,7 +198,9 @@ void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partition
 // override
 void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
     if (state_ != Ready) {
-        callback(ResultAlreadyClosed, msg.getMessageId());
+        if (callback) {
+            callback(ResultAlreadyClosed, msg.getMessageId());
+        }
         return;
     }
 
@@ -209,7 +211,9 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
         LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
         // change me: abort or notify failure in callback?
         //          change to appropriate error if callback
-        callback(ResultUnknownError, msg.getMessageId());
+        if (callback) {
+            callback(ResultUnknownError, msg.getMessageId());
+        }
         return;
     }
     // find a producer for that partition, index should start from 0
@@ -223,7 +227,19 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
     producersLock.unlock();
 
     // send message on that partition
-    producer->sendAsync(msg, callback);
+    if (!conf_.getLazyStartPartitionedProducers() || producer->ready()) {
+        producer->sendAsync(msg, std::move(callback));
+    } else {
+        // Wrapping the callback into a lambda has overhead, so we check if the producer is ready first
+        producer->getProducerCreatedFuture().addListener(
+            [msg, callback](Result result, ProducerImplBaseWeakPtr weakProducer) {
+                if (result == ResultOk) {
+                    weakProducer.lock()->sendAsync(msg, std::move(callback));
+                } else if (callback) {
+                    callback(result, {});
+                }
+            });
+    }
 }
 
 // override
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 80ee735..76a999a 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -60,8 +60,9 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
       userProvidedProducerName_(false),
       producerStr_("[" + topic() + ", " + producerName_ + "] "),
       producerId_(client->newProducerId()),
-      msgSequenceGenerator_(0),
       batchTimer_(executor_->createDeadlineTimer()),
+      lastSequenceIdPublished_(conf.getInitialSequenceId()),
+      msgSequenceGenerator_(lastSequenceIdPublished_ + 1),
       sendTimer_(executor_->createDeadlineTimer()),
       dataKeyRefreshTask_(*executor_, 4 * 60 * 60 * 1000),
       memoryLimitController_(client->getMemoryLimitController()),
@@ -69,11 +70,6 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
       interceptors_(interceptors) {
     LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic()
                                 << " id: " << producerId_);
-
-    int64_t initialSequenceId = conf.getInitialSequenceId();
-    lastSequenceIdPublished_ = initialSequenceId;
-    msgSequenceGenerator_ = initialSequenceId + 1;
-
     if (!producerName_.empty()) {
         userProvidedProducerName_ = true;
     }
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 91b9544..2fb0b88 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -19,6 +19,7 @@
 #ifndef LIB_PRODUCERIMPL_H_
 #define LIB_PRODUCERIMPL_H_
 
+#include <atomic>
 #include <boost/optional.hpp>
 #include <list>
 #include <memory>
@@ -103,6 +104,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {
 
     ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this(); }
 
+    bool ready() const { return producerCreatedPromise_.isComplete(); }
+
    protected:
     ProducerStatsBasePtr producerStatsBasePtr_;
 
@@ -169,13 +172,13 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {
     bool userProvidedProducerName_;
     std::string producerStr_;
     uint64_t producerId_;
-    int64_t msgSequenceGenerator_;
 
     std::unique_ptr<BatchMessageContainerBase> batchMessageContainer_;
     DeadlineTimerPtr batchTimer_;
     PendingFailures batchMessageAndSend(const FlushCallback& flushCallback = nullptr);
 
-    volatile int64_t lastSequenceIdPublished_;
+    std::atomic<int64_t> lastSequenceIdPublished_;
+    std::atomic<int64_t> msgSequenceGenerator_;
     std::string schemaVersion_;
 
     DeadlineTimerPtr sendTimer_;
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 993c2fd..3bc5a15 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -72,5 +72,5 @@ add_executable(Oauth2Test oauth2/Oauth2Test.cc)
 target_compile_options(Oauth2Test PRIVATE -DTEST_CONF_DIR="${TEST_CONF_DIR}")
 target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
 
-add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc)
+add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc HttpHelper.cc)
 target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
diff --git a/tests/chunkdedup/ChunkDedupTest.cc b/tests/chunkdedup/ChunkDedupTest.cc
index 609511f..4c9c28e 100644
--- a/tests/chunkdedup/ChunkDedupTest.cc
+++ b/tests/chunkdedup/ChunkDedupTest.cc
@@ -18,7 +18,9 @@
  */
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
+#include <time.h>
 
+#include "../HttpHelper.h"
 #include "lib/Latch.h"
 #include "lib/LogUtils.h"
 
@@ -47,6 +49,30 @@ TEST(ChunkDedupTest, testSendChunks) {
     client.close();
 }
 
+TEST(ChunkDedupTest, testLazyPartitionedProducer) {
+    std::string topic = "test-lazy-partitioned-producer-" + std::to_string(time(nullptr));
+    Client client{"pulsar://localhost:6650"};
+    ProducerConfiguration conf;
+    conf.setLazyStartPartitionedProducers(true);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+
+    constexpr int numPartitions = 3;
+    int res =
+        makePutRequest("http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions",
+                       std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    for (int i = 0; i < 10; i++) {
+        const auto key = std::to_string(i % numPartitions);
+        MessageId msgId;
+        producer.send(MessageBuilder().setPartitionKey(key).setContent("msg-" + std::to_string(i)).build(),
+                      msgId);
+        ASSERT_TRUE(msgId.ledgerId() >= 0 && msgId.entryId() >= 0) << "i: " << i << ", msgId: " << msgId;
+    }
+    client.close();
+}
+
 int main(int argc, char* argv[]) {
     ::testing::InitGoogleTest(&argc, argv);
     return RUN_ALL_TESTS();