You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/10/07 05:40:30 UTC
[pulsar-client-cpp] branch main updated: [fix] Fix consumer doesn't acknowledge all chunk message Ids (#321)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new eea59bb [fix] Fix consumer doesn't acknowledge all chunk message Ids (#321)
eea59bb is described below
commit eea59bb4458c6ea190f51732d41fdb32c9ad6327
Author: Zike Yang <zi...@apache.org>
AuthorDate: Sat Oct 7 13:40:24 2023 +0800
[fix] Fix consumer doesn't acknowledge all chunk message Ids (#321)
---
lib/AckGroupingTracker.cc | 33 +++++++++++++++++++++++++++----
lib/ChunkMessageIdImpl.h | 18 ++++++++---------
lib/Commands.cc | 6 +++---
lib/ConsumerImpl.cc | 8 ++++----
lib/ConsumerImpl.h | 4 ++--
lib/MessageId.cc | 19 +++++++++---------
lib/OpSendMsg.h | 8 +++++---
lib/ProducerImpl.cc | 19 +++++++++---------
tests/MessageChunkingTest.cc | 47 +++++++++++++++++++++++++++++---------------
9 files changed, 100 insertions(+), 62 deletions(-)
diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc
index 9a47135..ab7381f 100644
--- a/lib/AckGroupingTracker.cc
+++ b/lib/AckGroupingTracker.cc
@@ -21,8 +21,10 @@
#include <atomic>
#include <limits>
+#include <set>
#include "BitSet.h"
+#include "ChunkMessageIdImpl.h"
#include "ClientConnection.h"
#include "Commands.h"
#include "LogUtils.h"
@@ -42,6 +44,17 @@ void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback c
}
return;
}
+ if (ackType == CommandAck_AckType_Individual) {
+ // If it's individual ack, we need to acknowledge all message IDs in a chunked message Id
+ // If it's cumulative ack, we only need to ack the last message ID of a chunked message.
+ // ChunkedMessageId return last chunk message ID by default, so we don't need to handle it.
+ if (auto chunkMessageId =
+ std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
+ auto msgIdList = chunkMessageId->getChunkedMessageIds();
+ doImmediateAck(std::set<MessageId>(msgIdList.begin(), msgIdList.end()), callback);
+ return;
+ }
+ }
const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
@@ -84,29 +97,41 @@ void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, Resul
return;
}
+ std::set<MessageId> ackMsgIds;
+
+ for (const auto& msgId : msgIds) {
+ if (auto chunkMessageId =
+ std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
+ auto msgIdList = chunkMessageId->getChunkedMessageIds();
+ ackMsgIds.insert(msgIdList.begin(), msgIdList.end());
+ } else {
+ ackMsgIds.insert(msgId);
+ }
+ }
+
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
- cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
+ cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId)
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
}
});
} else {
- cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));
+ cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, ackMsgIds));
if (callback) {
callback(ResultOk);
}
}
} else {
- auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
+ auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
auto wrappedCallback = [callback, count](Result result) {
if (--*count == 0 && callback) {
callback(result);
}
};
- for (auto&& msgId : msgIds) {
+ for (auto&& msgId : ackMsgIds) {
doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
}
}
diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h
index 3081ff0..3fb0f13 100644
--- a/lib/ChunkMessageIdImpl.h
+++ b/lib/ChunkMessageIdImpl.h
@@ -28,21 +28,19 @@ class ChunkMessageIdImpl;
typedef std::shared_ptr<ChunkMessageIdImpl> ChunkMessageIdImplPtr;
class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this<ChunkMessageIdImpl> {
public:
- ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>()) {}
-
- void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; }
-
- void setLastChunkMessageId(const MessageId& msgId) {
- this->ledgerId_ = msgId.ledgerId();
- this->entryId_ = msgId.entryId();
- this->partition_ = msgId.partition();
+ explicit ChunkMessageIdImpl(std::vector<MessageId>&& chunkedMessageIds)
+ : chunkedMessageIds_(std::move(chunkedMessageIds)) {
+ auto lastChunkMsgId = chunkedMessageIds_.back();
+ this->ledgerId_ = lastChunkMsgId.ledgerId();
+ this->entryId_ = lastChunkMsgId.entryId();
+ this->partition_ = lastChunkMsgId.partition();
}
- std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { return firstChunkMsgId_; }
+ const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
MessageId build() { return MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }
private:
- std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
+ std::vector<MessageId> chunkedMessageIds_;
};
} // namespace pulsar
diff --git a/lib/Commands.cc b/lib/Commands.cc
index f2e6c6d..4b10b73 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -583,9 +583,9 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
if (chunkMsgId) {
- auto firstId = chunkMsgId->getFirstChunkMessageId();
- messageIdData.set_ledgerid(firstId->ledgerId_);
- messageIdData.set_entryid(firstId->entryId_);
+ const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
+ messageIdData.set_ledgerid(firstId.ledgerId());
+ messageIdData.set_entryid(firstId.entryId());
} else {
messageIdData.set_ledgerid(messageId.ledgerId());
messageIdData.set_entryid(messageId.entryId());
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index d82cf78..52f0440 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -479,10 +479,7 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
return boost::none;
}
- ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
- chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
- chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
- messageId = chunkMsgId->build();
+ messageId = std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds())->build();
LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
<< ", sequenceId: " << metadata.sequence_id());
@@ -1174,6 +1171,9 @@ std::pair<MessageId, bool> ConsumerImpl::prepareIndividualAck(const MessageId& m
(batchSize > 0) ? batchSize : 1);
unAckedMessageTrackerPtr_->remove(messageId);
possibleSendToDeadLetterTopicMessages_.remove(messageId);
+ if (std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageIdImpl)) {
+ return std::make_pair(messageId, true);
+ }
return std::make_pair(discardBatch(messageId), true);
} else if (config_.isBatchIndexAckEnabled()) {
return std::make_pair(messageId, true);
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index def2543..690d8fc 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -270,6 +270,8 @@ class ConsumerImpl : public ConsumerImplBase {
const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
+ std::vector<MessageId> moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); }
+
long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }
friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) {
@@ -292,8 +294,6 @@ class ConsumerImpl : public ConsumerImplBase {
// concurrently on the topic) then it guards against broken chunked message which was not fully published
const bool autoAckOldestChunkedMessageOnQueueFull_;
- // The key is UUID, value is the associated ChunkedMessageCtx of the chunked message.
- std::unordered_map<std::string, ChunkedMessageCtx> chunkedMessagesMap_;
// This list contains all the keys of `chunkedMessagesMap_`, each key is an UUID that identifies a pending
// chunked message. Once the number of pending chunked messages exceeds the limit, the oldest UUIDs and
// the associated ChunkedMessageCtx will be removed.
diff --git a/lib/MessageId.cc b/lib/MessageId.cc
index 12b6f40..b51fd9f 100644
--- a/lib/MessageId.cc
+++ b/lib/MessageId.cc
@@ -76,11 +76,11 @@ void MessageId::serialize(std::string& result) const {
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
if (chunkMsgId) {
proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
- auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
- firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
- firstChunkIdData.set_entryid(firstChunkId->entryId_);
+ const auto& firstChunkId = chunkMsgId->getChunkedMessageIds().front();
+ firstChunkIdData.set_ledgerid(firstChunkId.ledgerId());
+ firstChunkIdData.set_entryid(firstChunkId.entryId());
if (chunkMsgId->partition_ != -1) {
- firstChunkIdData.set_partition(firstChunkId->partition_);
+ firstChunkIdData.set_partition(firstChunkId.partition());
}
}
@@ -99,9 +99,8 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) {
MessageId msgId = MessageIdBuilder::from(idData).build();
if (idData.has_first_chunk_message_id()) {
- ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
- chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
- chunkMsgId->setLastChunkMessageId(msgId);
+ ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(
+ std::vector<MessageId>({MessageIdBuilder::from(idData.first_chunk_message_id()).build(), msgId}));
return chunkMsgId->build();
}
@@ -121,9 +120,9 @@ int32_t MessageId::batchSize() const { return impl_->batchSize_; }
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
if (chunkMsgId) {
- auto firstId = chunkMsgId->getFirstChunkMessageId();
- s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ','
- << firstId->batchIndex_ << ");";
+ const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
+ s << '(' << firstId.ledgerId() << ',' << firstId.entryId() << ',' << firstId.partition() << ','
+ << firstId.batchIndex() << ");";
}
s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
<< messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')';
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index 06fa77f..a1319e1 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -45,6 +45,8 @@ struct SendArguments {
SendArguments& operator=(const SendArguments&) = delete;
};
+typedef std::shared_ptr<std::vector<MessageId>> ChunkMessageIdListPtr;
+
struct OpSendMsg {
const Result result;
const int32_t chunkId;
@@ -54,7 +56,7 @@ struct OpSendMsg {
const boost::posix_time::ptime timeout;
const SendCallback sendCallback;
std::vector<std::function<void(Result)>> trackerCallbacks;
- ChunkMessageIdImplPtr chunkedMessageId;
+ ChunkMessageIdListPtr chunkMessageIdList;
// Use shared_ptr here because producer might resend the message with the same arguments
const std::shared_ptr<SendArguments> sendArgs;
@@ -89,7 +91,7 @@ struct OpSendMsg {
sendArgs(nullptr) {}
OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize,
- int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdImplPtr chunkedMessageId,
+ int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdListPtr chunkMessageIdList,
uint64_t producerId, SharedBuffer payload)
: result(ResultOk),
chunkId(metadata.chunk_id()),
@@ -98,7 +100,7 @@ struct OpSendMsg {
messagesSize(messagesSize),
timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)),
sendCallback(std::move(callback)),
- chunkedMessageId(chunkedMessageId),
+ chunkMessageIdList(std::move(chunkMessageIdList)),
sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {}
};
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 3166c19..8bd14f2 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -572,14 +572,14 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
}
} else {
const bool sendChunks = (totalChunks > 1);
+ ChunkMessageIdListPtr chunkMessageIdList;
if (sendChunks) {
msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId));
msgMetadata.set_num_chunks_from_msg(totalChunks);
msgMetadata.set_total_chunk_msg_size(compressedSize);
+ chunkMessageIdList = std::make_shared<std::vector<MessageId>>();
}
- auto chunkMessageId = totalChunks > 1 ? std::make_shared<ChunkMessageIdImpl>() : nullptr;
-
int beginIndex = 0;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
if (sendChunks) {
@@ -596,7 +596,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
}
auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, conf_.getSendTimeout(),
- (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageId,
+ (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageIdList,
producerId_, encryptedPayload);
if (!chunkingEnabled_) {
@@ -887,7 +887,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
return true;
}
- const auto& op = *pendingMessagesQueue_.front();
+ auto& op = *pendingMessagesQueue_.front();
if (op.result != ResultOk) {
LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and "
<< rawMessageId);
@@ -911,13 +911,12 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
- if (op.chunkedMessageId) {
+ if (op.chunkMessageIdList) {
// Handling the chunk message id.
- if (op.chunkId == 0) {
- op.chunkedMessageId->setFirstChunkMessageId(messageId);
- } else if (op.chunkId == op.numChunks - 1) {
- op.chunkedMessageId->setLastChunkMessageId(messageId);
- messageId = op.chunkedMessageId->build();
+ op.chunkMessageIdList->push_back(messageId);
+ if (op.chunkId == op.numChunks - 1) {
+ auto chunkedMessageId = std::make_shared<ChunkMessageIdImpl>(std::move(*op.chunkMessageIdList));
+ messageId = chunkedMessageId->build();
}
}
diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc
index 6d54a69..f68dd3d 100644
--- a/tests/MessageChunkingTest.cc
+++ b/tests/MessageChunkingTest.cc
@@ -81,7 +81,9 @@ class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> {
}
void createConsumer(const std::string& topic, Consumer& consumer) {
- ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
+ ConsumerConfiguration conf;
+ conf.setBrokerConsumerStatsCacheTimeInMs(1000);
+ ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer));
}
void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) {
@@ -118,9 +120,6 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
for (int i = 0; i < numMessages; i++) {
MessageId messageId;
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
- auto chunkMsgId =
- std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
- ASSERT_TRUE(chunkMsgId);
LOG_INFO("Send " << i << " to " << messageId);
sendMessageIds.emplace_back(messageId);
}
@@ -134,19 +133,35 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
ASSERT_EQ(msg.getMessageId().batchSize(), 0);
auto messageId = msg.getMessageId();
- auto chunkMsgId =
- std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
- ASSERT_TRUE(chunkMsgId);
receivedMessageIds.emplace_back(messageId);
+ consumer.acknowledge(messageId);
}
ASSERT_EQ(receivedMessageIds, sendMessageIds);
- ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId());
+ for (int i = 0; i < sendMessageIds.size(); ++i) {
+ auto sendChunkMsgId =
+ std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(sendMessageIds[i]));
+ ASSERT_TRUE(sendChunkMsgId);
+ auto receiveChunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(
+ PulsarFriend::getMessageIdImpl(receivedMessageIds[i]));
+ ASSERT_TRUE(receiveChunkMsgId);
+ ASSERT_EQ(sendChunkMsgId->getChunkedMessageIds(), receiveChunkMsgId->getChunkedMessageIds());
+ }
ASSERT_GT(receivedMessageIds.back().entryId(), numMessages);
// Verify the cache has been cleared
auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
ASSERT_EQ(chunkedMessageCache.size(), 0);
+ BrokerConsumerStats consumerStats;
+ waitUntil(
+ std::chrono::seconds(10),
+ [&] {
+ return consumer.getBrokerConsumerStats(consumerStats) == ResultOk &&
+ consumerStats.getMsgBacklog() == 0;
+ },
+ 1000);
+ ASSERT_EQ(consumerStats.getMsgBacklog(), 0);
+
producer.close();
consumer.close();
}
@@ -317,9 +332,9 @@ TEST_P(MessageChunkingTest, testSeekChunkMessages) {
TEST(ChunkMessageIdTest, testSetChunkMessageId) {
MessageId msgId;
{
- ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
- chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build());
- chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build());
+ ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(
+ std::vector<MessageId>({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(),
+ MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()}));
msgId = chunkMsgId->build();
// Test the destructor of the underlying message id should also work for the generated messageId.
}
@@ -332,13 +347,13 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) {
ASSERT_EQ(deserializedMsgId.entryId(), 5);
ASSERT_EQ(deserializedMsgId.partition(), 6);
- auto chunkMsgId =
+ const auto& chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(deserializedMsgId));
ASSERT_TRUE(chunkMsgId);
- auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId();
- ASSERT_EQ(firstChunkMsgId->ledgerId_, 1);
- ASSERT_EQ(firstChunkMsgId->entryId_, 2);
- ASSERT_EQ(firstChunkMsgId->partition_, 3);
+ auto firstChunkMsgId = chunkMsgId->getChunkedMessageIds().front();
+ ASSERT_EQ(firstChunkMsgId.ledgerId(), 1);
+ ASSERT_EQ(firstChunkMsgId.entryId(), 2);
+ ASSERT_EQ(firstChunkMsgId.partition(), 3);
}
// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P