You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/11/17 06:28:05 UTC
(pulsar-client-cpp) 03/04: Fix lazy partitioned producer might send duplicated messages (#342)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
commit 8408e657bf0ae1f34489da25e9aa3de4688bfd6f
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Nov 13 10:02:01 2023 +0800
Fix lazy partitioned producer might send duplicated messages (#342)
Fixes https://github.com/apache/pulsar-client-cpp/issues/341
### Motivation
When a lazy partitioned producer sends two messages, the flow is:
1. `start` is called to grab the connection via `grab()`.
2. Generate 0 as the sequence id of the 1st message.
3. Add the 1st message into the queuea.
4. The connection is established, `msgSequenceGenerator_` is reset from
1 to 0.
5. When sending the 2nd message, 0 is also generated as the sequence id.
Then two messages have the same sequence id.
### Modifications
For lazy partitioned producers, if the internal producer is not started,
sending the message in the callback of its future.
Add `ChunkDedupTest#testLazyPartitionedProducer` to verify it since only
the `tests/chunkdedup/docker-compose.yml` enables the deduplication.
(cherry picked from commit bb16f24bb68699c15eddba3163cfbef0b1282ebf)
---
lib/PartitionedProducerImpl.cc | 22 +++++++++++++++++++---
lib/ProducerImpl.cc | 8 ++------
lib/ProducerImpl.h | 7 +++++--
tests/CMakeLists.txt | 2 +-
tests/chunkdedup/ChunkDedupTest.cc | 26 ++++++++++++++++++++++++++
5 files changed, 53 insertions(+), 12 deletions(-)
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index f7957d6..a799d5c 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -198,7 +198,9 @@ void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partition
// override
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
if (state_ != Ready) {
- callback(ResultAlreadyClosed, msg.getMessageId());
+ if (callback) {
+ callback(ResultAlreadyClosed, msg.getMessageId());
+ }
return;
}
@@ -209,7 +211,9 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
// change me: abort or notify failure in callback?
// change to appropriate error if callback
- callback(ResultUnknownError, msg.getMessageId());
+ if (callback) {
+ callback(ResultUnknownError, msg.getMessageId());
+ }
return;
}
// find a producer for that partition, index should start from 0
@@ -223,7 +227,19 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
producersLock.unlock();
// send message on that partition
- producer->sendAsync(msg, callback);
+ if (!conf_.getLazyStartPartitionedProducers() || producer->ready()) {
+ producer->sendAsync(msg, std::move(callback));
+ } else {
+ // Wrapping the callback into a lambda has overhead, so we check if the producer is ready first
+ producer->getProducerCreatedFuture().addListener(
+ [msg, callback](Result result, ProducerImplBaseWeakPtr weakProducer) {
+ if (result == ResultOk) {
+ weakProducer.lock()->sendAsync(msg, std::move(callback));
+ } else if (callback) {
+ callback(result, {});
+ }
+ });
+ }
}
// override
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 80ee735..76a999a 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -60,8 +60,9 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
userProvidedProducerName_(false),
producerStr_("[" + topic() + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
- msgSequenceGenerator_(0),
batchTimer_(executor_->createDeadlineTimer()),
+ lastSequenceIdPublished_(conf.getInitialSequenceId()),
+ msgSequenceGenerator_(lastSequenceIdPublished_ + 1),
sendTimer_(executor_->createDeadlineTimer()),
dataKeyRefreshTask_(*executor_, 4 * 60 * 60 * 1000),
memoryLimitController_(client->getMemoryLimitController()),
@@ -69,11 +70,6 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
interceptors_(interceptors) {
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic()
<< " id: " << producerId_);
-
- int64_t initialSequenceId = conf.getInitialSequenceId();
- lastSequenceIdPublished_ = initialSequenceId;
- msgSequenceGenerator_ = initialSequenceId + 1;
-
if (!producerName_.empty()) {
userProvidedProducerName_ = true;
}
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 91b9544..2fb0b88 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -19,6 +19,7 @@
#ifndef LIB_PRODUCERIMPL_H_
#define LIB_PRODUCERIMPL_H_
+#include <atomic>
#include <boost/optional.hpp>
#include <list>
#include <memory>
@@ -103,6 +104,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {
ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this(); }
+ bool ready() const { return producerCreatedPromise_.isComplete(); }
+
protected:
ProducerStatsBasePtr producerStatsBasePtr_;
@@ -169,13 +172,13 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {
bool userProvidedProducerName_;
std::string producerStr_;
uint64_t producerId_;
- int64_t msgSequenceGenerator_;
std::unique_ptr<BatchMessageContainerBase> batchMessageContainer_;
DeadlineTimerPtr batchTimer_;
PendingFailures batchMessageAndSend(const FlushCallback& flushCallback = nullptr);
- volatile int64_t lastSequenceIdPublished_;
+ std::atomic<int64_t> lastSequenceIdPublished_;
+ std::atomic<int64_t> msgSequenceGenerator_;
std::string schemaVersion_;
DeadlineTimerPtr sendTimer_;
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 993c2fd..3bc5a15 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -72,5 +72,5 @@ add_executable(Oauth2Test oauth2/Oauth2Test.cc)
target_compile_options(Oauth2Test PRIVATE -DTEST_CONF_DIR="${TEST_CONF_DIR}")
target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
-add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc)
+add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc HttpHelper.cc)
target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
diff --git a/tests/chunkdedup/ChunkDedupTest.cc b/tests/chunkdedup/ChunkDedupTest.cc
index 609511f..4c9c28e 100644
--- a/tests/chunkdedup/ChunkDedupTest.cc
+++ b/tests/chunkdedup/ChunkDedupTest.cc
@@ -18,7 +18,9 @@
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include <time.h>
+#include "../HttpHelper.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"
@@ -47,6 +49,30 @@ TEST(ChunkDedupTest, testSendChunks) {
client.close();
}
+TEST(ChunkDedupTest, testLazyPartitionedProducer) {
+ std::string topic = "test-lazy-partitioned-producer-" + std::to_string(time(nullptr));
+ Client client{"pulsar://localhost:6650"};
+ ProducerConfiguration conf;
+ conf.setLazyStartPartitionedProducers(true);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+
+ constexpr int numPartitions = 3;
+ int res =
+ makePutRequest("http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions",
+ std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ for (int i = 0; i < 10; i++) {
+ const auto key = std::to_string(i % numPartitions);
+ MessageId msgId;
+ producer.send(MessageBuilder().setPartitionKey(key).setContent("msg-" + std::to_string(i)).build(),
+ msgId);
+ ASSERT_TRUE(msgId.ledgerId() >= 0 && msgId.entryId() >= 0) << "i: " << i << ", msgId: " << msgId;
+ }
+ client.close();
+}
+
int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();