You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 04:12:51 UTC
[pulsar] branch master updated: Fix message id error if messages
were sent to a partitioned topic (#6938)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 15cb920 Fix message id error if messages were sent to a partitioned topic (#6938)
15cb920 is described below
commit 15cb920b394874d37039df5e7665092651c28fae
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue May 12 12:12:43 2020 +0800
Fix message id error if messages were sent to a partitioned topic (#6938)
### Motivation
If messages were sent to a partitioned topic, the message id's `partition` field was always -1 because SendReceipt command only contains ledger id and entry id.
### Modifications
- Add a `partition` field to `ProducerImpl` and set the `MessageId`'s `partition` field with it in `ackReceived` method later.
- Add a test to check message id in send callback if messages were sent to a partitioned topic.
---
pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 2 +-
pulsar-client-cpp/lib/ProducerImpl.cc | 8 +++-
pulsar-client-cpp/lib/ProducerImpl.h | 3 +-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 54 ++++++++++++++++++++++++
4 files changed, 63 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 0461ee3..628afbc 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -90,7 +90,7 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
using namespace std::placeholders;
std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
- auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
+ auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 8dad6b7..a488a86 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -39,13 +39,15 @@ OpSendMsg::OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& ms
sequenceId_(sequenceId),
timeout_(TimeUtils::now() + milliseconds(conf.getSendTimeout())) {}
-ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf)
+ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf,
+ int32_t partition)
: HandlerBase(
client, topic,
Backoff(milliseconds(100), seconds(60), milliseconds(std::max(100, conf.getSendTimeout() - 100)))),
conf_(conf),
executor_(client->getIOExecutorProvider()->get()),
pendingMessagesQueue_(conf_.getMaxPendingMessages()),
+ partition_(partition),
producerName_(conf_.getProducerName()),
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
@@ -627,7 +629,9 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
}
}
-bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& messageId) {
+bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
+ MessageId messageId(partition_, rawMessageId.ledgerId(), rawMessageId.entryId(),
+ rawMessageId.batchIndex());
OpSendMsg op;
Lock lock(mutex_);
bool havePendingAck = pendingMessagesQueue_.peek(op);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 80927b1..e4f35d4 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -63,7 +63,7 @@ class ProducerImpl : public HandlerBase,
public ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const std::string& topic,
- const ProducerConfiguration& producerConfiguration);
+ const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
~ProducerImpl();
int keepMaxMessageSize_;
@@ -150,6 +150,7 @@ class ProducerImpl : public HandlerBase,
MessageQueue pendingMessagesQueue_;
+ int32_t partition_; // -1 if topic is non-partitioned
std::string producerName_;
std::string producerStr_;
uint64_t producerId_;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index ccff387..26bdee5 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3214,11 +3214,65 @@ TEST(BasicEndToEndTest, testSendCallback) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg));
receivedIdSet.emplace(msg.getMessageId());
+ consumer.acknowledge(msg);
+ }
+
+ latch.wait();
+ ASSERT_EQ(sentIdSet, receivedIdSet);
+
+ consumer.close();
+ producer.close();
+
+ const std::string partitionedTopicName = topicName + "-" + std::to_string(time(nullptr));
+ const std::string url = adminUrl + "admin/v2/persistent/" +
+ partitionedTopicName.substr(partitionedTopicName.find("://") + 3) + "/partitions";
+ const int numPartitions = 3;
+
+ int res = makePutRequest(url, std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(false);
+ producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+ ASSERT_EQ(ResultOk, client.createProducer(partitionedTopicName, producerConfig, producer));
+ ASSERT_EQ(ResultOk, client.subscribe(partitionedTopicName, "SubscriptionName", consumer));
+
+ sentIdSet.clear();
+ receivedIdSet.clear();
+
+ const int numMessages = numPartitions * 2;
+ latch = Latch(numMessages);
+ for (int i = 0; i < numMessages; i++) {
+ const auto msg = MessageBuilder().setContent("a").build();
+ producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId &id) {
+ ASSERT_EQ(ResultOk, result);
+ sentIdSet.emplace(id);
+ latch.countdown();
+ });
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg));
+ receivedIdSet.emplace(msg.getMessageId());
+ consumer.acknowledge(msg);
}
latch.wait();
ASSERT_EQ(sentIdSet, receivedIdSet);
+ std::set<int> partitionIndexSet;
+ for (const auto &id : sentIdSet) {
+ partitionIndexSet.emplace(id.partition());
+ }
+ std::set<int> expectedPartitionIndexSet;
+ for (int i = 0; i < numPartitions; i++) {
+ expectedPartitionIndexSet.emplace(i);
+ }
+ ASSERT_EQ(sentIdSet, receivedIdSet);
+
consumer.close();
producer.close();
client.close();