You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/12 00:51:10 UTC
[pulsar] branch master updated: [C++] UnAckedMessageTracker handles
batch message incorrectly (#9170)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d4c1677 [C++] UnAckedMessageTracker handles batch message incorrectly (#9170)
d4c1677 is described below
commit d4c1677be93e5673672018f79da8a213514f01a3
Author: sijianliang <li...@foxmail.com>
AuthorDate: Tue Jan 12 08:50:35 2021 +0800
[C++] UnAckedMessageTracker handles batch message incorrectly (#9170)
### Motivation
It is necessary to reset batchIndex of MessageId when tracking a message. Because only the ack of the last message in batch is
removed from `UnAckedMessageTrackerEnabled`
### Verifying this change
add test case: testBatchUnAckedMessageTracker()
### Modifications
when track or untrack a message, reset batchIndex of MessageId to -1
---
pulsar-client-cpp/lib/ConsumerImpl.h | 1 +
.../lib/UnAckedMessageTrackerEnabled.cc | 16 ++---
.../lib/UnAckedMessageTrackerEnabled.h | 5 +-
pulsar-client-cpp/tests/ConsumerTest.cc | 68 ++++++++++++++++++++++
4 files changed, 81 insertions(+), 9 deletions(-)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index ef60b94..313f1d9 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -223,6 +223,7 @@ class ConsumerImpl : public ConsumerImplBase,
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
+ FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
};
} /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 96dfd6c..3364c0e 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -90,12 +90,13 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long
timeoutHandler();
}
-bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
+bool UnAckedMessageTrackerEnabled::add(const MessageId& msgId) {
std::lock_guard<std::mutex> acquire(lock_);
- if (messageIdPartitionMap.count(m) == 0) {
+ MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1);
+ if (messageIdPartitionMap.count(id) == 0) {
std::set<MessageId>& partition = timePartitions.back();
- bool emplace = messageIdPartitionMap.emplace(m, partition).second;
- bool insert = partition.insert(m).second;
+ bool emplace = messageIdPartitionMap.emplace(id, partition).second;
+ bool insert = partition.insert(id).second;
return emplace && insert;
}
return false;
@@ -106,13 +107,14 @@ bool UnAckedMessageTrackerEnabled::isEmpty() {
return messageIdPartitionMap.empty();
}
-bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
+bool UnAckedMessageTrackerEnabled::remove(const MessageId& msgId) {
std::lock_guard<std::mutex> acquire(lock_);
+ MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1);
bool removed = false;
- std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(m);
+ std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(id);
if (exist != messageIdPartitionMap.end()) {
- removed = exist->second.erase(m);
+ removed = exist->second.erase(id);
messageIdPartitionMap.erase(exist);
}
return removed;
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 36753dc..12daf45 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -29,8 +29,8 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
~UnAckedMessageTrackerEnabled();
UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, ConsumerImplBase&);
UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const ClientImplPtr, ConsumerImplBase&);
- bool add(const MessageId& m);
- bool remove(const MessageId& m);
+ bool add(const MessageId& msgId);
+ bool remove(const MessageId& msgId);
void removeMessagesTill(const MessageId& msgId);
void removeTopicMessage(const std::string& topic);
void timeoutHandler();
@@ -52,6 +52,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
+ FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index 2278c05..e0d4091 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -20,6 +20,8 @@
#include <thread>
#include <time.h>
#include <set>
+#include <map>
+#include <vector>
#include "gtest/gtest.h"
@@ -345,4 +347,70 @@ TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
client.close();
}
+TEST(ConsumerTest, testBatchUnAckedMessageTracker) {
+ Client client(lookupUrl);
+ const std::string topic = "testBatchUnAckedMessageTracker" + std::to_string(time(nullptr));
+ std::string subName = "sub-batch-un-acked-msg-tracker";
+ constexpr int numOfMessages = 50;
+ constexpr int batchSize = 5;
+ constexpr int batchCount = numOfMessages / batchSize;
+ constexpr int unAckedMessagesTimeoutMs = 10000;
+ constexpr int tickDurationInMs = 1000;
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+ consumerConfig.setTickDurationInMs(tickDurationInMs);
+ ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+ auto consumerImplPtr = PulsarFriend::getConsumerImplPtr(consumer);
+ auto tracker =
+ static_cast<UnAckedMessageTrackerEnabled*>(consumerImplPtr->unAckedMessageTrackerPtr_.get());
+
+ // send messages
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(true);
+ producerConfig.setBlockIfQueueFull(true);
+ producerConfig.setBatchingMaxMessages(batchSize);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+ std::string prefix = "message-";
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix + std::to_string(i);
+ Message msg = MessageBuilder().setContent(messageContent).build();
+ producer.sendAsync(msg, NULL);
+ }
+ producer.close();
+
+ std::map<MessageId, std::vector<MessageId>> msgIdInBatchMap;
+ std::vector<MessageId> messageIds;
+ for (auto i = 0; i < numOfMessages; ++i) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ MessageId msgId = msg.getMessageId();
+ MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1);
+ msgIdInBatchMap[id].emplace_back(msgId);
+ }
+
+ ASSERT_EQ(batchCount, msgIdInBatchMap.size());
+ ASSERT_EQ(batchCount, tracker->size());
+ for (const auto& iter : msgIdInBatchMap) {
+ ASSERT_EQ(iter.second.size(), batchSize);
+ }
+
+ int ackedBatchCount = 0;
+ for (auto iter = msgIdInBatchMap.begin(); iter != msgIdInBatchMap.end(); ++iter) {
+ ASSERT_EQ(batchSize, iter->second.size());
+ for (auto i = 0; i < iter->second.size(); ++i) {
+ ASSERT_EQ(ResultOk, consumer.acknowledge(iter->second[i]));
+ }
+ ackedBatchCount++;
+ ASSERT_EQ(batchCount - ackedBatchCount, tracker->size());
+ }
+ ASSERT_EQ(0, tracker->size());
+ ASSERT_TRUE(tracker->isEmpty());
+
+ consumer.close();
+ client.close();
+}
+
} // namespace pulsar