You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/10/07 05:40:30 UTC

[pulsar-client-cpp] branch main updated: [fix] Fix consumer doesn't acknowledge all chunk message Ids (#321)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new eea59bb  [fix] Fix consumer doesn't acknowledge all chunk message Ids (#321)
eea59bb is described below

commit eea59bb4458c6ea190f51732d41fdb32c9ad6327
Author: Zike Yang <zi...@apache.org>
AuthorDate: Sat Oct 7 13:40:24 2023 +0800

    [fix] Fix consumer doesn't acknowledge all chunk message Ids (#321)
---
 lib/AckGroupingTracker.cc    | 33 +++++++++++++++++++++++++++----
 lib/ChunkMessageIdImpl.h     | 18 ++++++++---------
 lib/Commands.cc              |  6 +++---
 lib/ConsumerImpl.cc          |  8 ++++----
 lib/ConsumerImpl.h           |  4 ++--
 lib/MessageId.cc             | 19 +++++++++---------
 lib/OpSendMsg.h              |  8 +++++---
 lib/ProducerImpl.cc          | 19 +++++++++---------
 tests/MessageChunkingTest.cc | 47 +++++++++++++++++++++++++++++---------------
 9 files changed, 100 insertions(+), 62 deletions(-)

diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc
index 9a47135..ab7381f 100644
--- a/lib/AckGroupingTracker.cc
+++ b/lib/AckGroupingTracker.cc
@@ -21,8 +21,10 @@
 
 #include <atomic>
 #include <limits>
+#include <set>
 
 #include "BitSet.h"
+#include "ChunkMessageIdImpl.h"
 #include "ClientConnection.h"
 #include "Commands.h"
 #include "LogUtils.h"
@@ -42,6 +44,17 @@ void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback c
         }
         return;
     }
+    if (ackType == CommandAck_AckType_Individual) {
+        // If it's individual ack, we need to acknowledge all message IDs in a chunked message Id
+        // If it's cumulative ack, we only need to ack the last message ID of a chunked message.
+        // ChunkedMessageId return last chunk message ID by default, so we don't need to handle it.
+        if (auto chunkMessageId =
+                std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
+            auto msgIdList = chunkMessageId->getChunkedMessageIds();
+            doImmediateAck(std::set<MessageId>(msgIdList.begin(), msgIdList.end()), callback);
+            return;
+        }
+    }
     const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
     if (waitResponse_) {
         const auto requestId = requestIdSupplier_();
@@ -84,29 +97,41 @@ void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, Resul
         return;
     }
 
+    std::set<MessageId> ackMsgIds;
+
+    for (const auto& msgId : msgIds) {
+        if (auto chunkMessageId =
+                std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
+            auto msgIdList = chunkMessageId->getChunkedMessageIds();
+            ackMsgIds.insert(msgIdList.begin(), msgIdList.end());
+        } else {
+            ackMsgIds.insert(msgId);
+        }
+    }
+
     if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
         if (waitResponse_) {
             const auto requestId = requestIdSupplier_();
-            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
+            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId)
                 .addListener([callback](Result result, const ResponseData&) {
                     if (callback) {
                         callback(result);
                     }
                 });
         } else {
-            cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));
+            cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, ackMsgIds));
             if (callback) {
                 callback(ResultOk);
             }
         }
     } else {
-        auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
+        auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
         auto wrappedCallback = [callback, count](Result result) {
             if (--*count == 0 && callback) {
                 callback(result);
             }
         };
-        for (auto&& msgId : msgIds) {
+        for (auto&& msgId : ackMsgIds) {
             doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
         }
     }
diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h
index 3081ff0..3fb0f13 100644
--- a/lib/ChunkMessageIdImpl.h
+++ b/lib/ChunkMessageIdImpl.h
@@ -28,21 +28,19 @@ class ChunkMessageIdImpl;
 typedef std::shared_ptr<ChunkMessageIdImpl> ChunkMessageIdImplPtr;
 class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this<ChunkMessageIdImpl> {
    public:
-    ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>()) {}
-
-    void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; }
-
-    void setLastChunkMessageId(const MessageId& msgId) {
-        this->ledgerId_ = msgId.ledgerId();
-        this->entryId_ = msgId.entryId();
-        this->partition_ = msgId.partition();
+    explicit ChunkMessageIdImpl(std::vector<MessageId>&& chunkedMessageIds)
+        : chunkedMessageIds_(std::move(chunkedMessageIds)) {
+        auto lastChunkMsgId = chunkedMessageIds_.back();
+        this->ledgerId_ = lastChunkMsgId.ledgerId();
+        this->entryId_ = lastChunkMsgId.entryId();
+        this->partition_ = lastChunkMsgId.partition();
     }
 
-    std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { return firstChunkMsgId_; }
+    const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
 
     MessageId build() { return MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }
 
    private:
-    std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
+    std::vector<MessageId> chunkedMessageIds_;
 };
 }  // namespace pulsar
diff --git a/lib/Commands.cc b/lib/Commands.cc
index f2e6c6d..4b10b73 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -583,9 +583,9 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me
 
     auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
     if (chunkMsgId) {
-        auto firstId = chunkMsgId->getFirstChunkMessageId();
-        messageIdData.set_ledgerid(firstId->ledgerId_);
-        messageIdData.set_entryid(firstId->entryId_);
+        const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
+        messageIdData.set_ledgerid(firstId.ledgerId());
+        messageIdData.set_entryid(firstId.entryId());
     } else {
         messageIdData.set_ledgerid(messageId.ledgerId());
         messageIdData.set_entryid(messageId.entryId());
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index d82cf78..52f0440 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -479,10 +479,7 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
         return boost::none;
     }
 
-    ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
-    chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
-    chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
-    messageId = chunkMsgId->build();
+    messageId = std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds())->build();
 
     LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
                                                     << ", sequenceId: " << metadata.sequence_id());
@@ -1174,6 +1171,9 @@ std::pair<MessageId, bool> ConsumerImpl::prepareIndividualAck(const MessageId& m
                                                    (batchSize > 0) ? batchSize : 1);
         unAckedMessageTrackerPtr_->remove(messageId);
         possibleSendToDeadLetterTopicMessages_.remove(messageId);
+        if (std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageIdImpl)) {
+            return std::make_pair(messageId, true);
+        }
         return std::make_pair(discardBatch(messageId), true);
     } else if (config_.isBatchIndexAckEnabled()) {
         return std::make_pair(messageId, true);
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index def2543..690d8fc 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -270,6 +270,8 @@ class ConsumerImpl : public ConsumerImplBase {
 
         const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
 
+        std::vector<MessageId> moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); }
+
         long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }
 
         friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) {
@@ -292,8 +294,6 @@ class ConsumerImpl : public ConsumerImplBase {
     // concurrently on the topic) then it guards against broken chunked message which was not fully published
     const bool autoAckOldestChunkedMessageOnQueueFull_;
 
-    // The key is UUID, value is the associated ChunkedMessageCtx of the chunked message.
-    std::unordered_map<std::string, ChunkedMessageCtx> chunkedMessagesMap_;
     // This list contains all the keys of `chunkedMessagesMap_`, each key is an UUID that identifies a pending
     // chunked message. Once the number of pending chunked messages exceeds the limit, the oldest UUIDs and
     // the associated ChunkedMessageCtx will be removed.
diff --git a/lib/MessageId.cc b/lib/MessageId.cc
index 12b6f40..b51fd9f 100644
--- a/lib/MessageId.cc
+++ b/lib/MessageId.cc
@@ -76,11 +76,11 @@ void MessageId::serialize(std::string& result) const {
     auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
     if (chunkMsgId) {
         proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
-        auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
-        firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
-        firstChunkIdData.set_entryid(firstChunkId->entryId_);
+        const auto& firstChunkId = chunkMsgId->getChunkedMessageIds().front();
+        firstChunkIdData.set_ledgerid(firstChunkId.ledgerId());
+        firstChunkIdData.set_entryid(firstChunkId.entryId());
         if (chunkMsgId->partition_ != -1) {
-            firstChunkIdData.set_partition(firstChunkId->partition_);
+            firstChunkIdData.set_partition(firstChunkId.partition());
         }
     }
 
@@ -99,9 +99,8 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) {
     MessageId msgId = MessageIdBuilder::from(idData).build();
 
     if (idData.has_first_chunk_message_id()) {
-        ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
-        chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
-        chunkMsgId->setLastChunkMessageId(msgId);
+        ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(
+            std::vector<MessageId>({MessageIdBuilder::from(idData.first_chunk_message_id()).build(), msgId}));
         return chunkMsgId->build();
     }
 
@@ -121,9 +120,9 @@ int32_t MessageId::batchSize() const { return impl_->batchSize_; }
 PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
     auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
     if (chunkMsgId) {
-        auto firstId = chunkMsgId->getFirstChunkMessageId();
-        s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ','
-          << firstId->batchIndex_ << ");";
+        const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
+        s << '(' << firstId.ledgerId() << ',' << firstId.entryId() << ',' << firstId.partition() << ','
+          << firstId.batchIndex() << ");";
     }
     s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
       << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')';
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index 06fa77f..a1319e1 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -45,6 +45,8 @@ struct SendArguments {
     SendArguments& operator=(const SendArguments&) = delete;
 };
 
+typedef std::shared_ptr<std::vector<MessageId>> ChunkMessageIdListPtr;
+
 struct OpSendMsg {
     const Result result;
     const int32_t chunkId;
@@ -54,7 +56,7 @@ struct OpSendMsg {
     const boost::posix_time::ptime timeout;
     const SendCallback sendCallback;
     std::vector<std::function<void(Result)>> trackerCallbacks;
-    ChunkMessageIdImplPtr chunkedMessageId;
+    ChunkMessageIdListPtr chunkMessageIdList;
     // Use shared_ptr here because producer might resend the message with the same arguments
     const std::shared_ptr<SendArguments> sendArgs;
 
@@ -89,7 +91,7 @@ struct OpSendMsg {
           sendArgs(nullptr) {}
 
     OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize,
-              int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdImplPtr chunkedMessageId,
+              int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdListPtr chunkMessageIdList,
               uint64_t producerId, SharedBuffer payload)
         : result(ResultOk),
           chunkId(metadata.chunk_id()),
@@ -98,7 +100,7 @@ struct OpSendMsg {
           messagesSize(messagesSize),
           timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)),
           sendCallback(std::move(callback)),
-          chunkedMessageId(chunkedMessageId),
+          chunkMessageIdList(std::move(chunkMessageIdList)),
           sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {}
 };
 
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 3166c19..8bd14f2 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -572,14 +572,14 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
         }
     } else {
         const bool sendChunks = (totalChunks > 1);
+        ChunkMessageIdListPtr chunkMessageIdList;
         if (sendChunks) {
             msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId));
             msgMetadata.set_num_chunks_from_msg(totalChunks);
             msgMetadata.set_total_chunk_msg_size(compressedSize);
+            chunkMessageIdList = std::make_shared<std::vector<MessageId>>();
         }
 
-        auto chunkMessageId = totalChunks > 1 ? std::make_shared<ChunkMessageIdImpl>() : nullptr;
-
         int beginIndex = 0;
         for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
             if (sendChunks) {
@@ -596,7 +596,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
             }
 
             auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, conf_.getSendTimeout(),
-                                        (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageId,
+                                        (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageIdList,
                                         producerId_, encryptedPayload);
 
             if (!chunkingEnabled_) {
@@ -887,7 +887,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
         return true;
     }
 
-    const auto& op = *pendingMessagesQueue_.front();
+    auto& op = *pendingMessagesQueue_.front();
     if (op.result != ResultOk) {
         LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and "
                                                           << rawMessageId);
@@ -911,13 +911,12 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
     // Message was persisted correctly
     LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
 
-    if (op.chunkedMessageId) {
+    if (op.chunkMessageIdList) {
         // Handling the chunk message id.
-        if (op.chunkId == 0) {
-            op.chunkedMessageId->setFirstChunkMessageId(messageId);
-        } else if (op.chunkId == op.numChunks - 1) {
-            op.chunkedMessageId->setLastChunkMessageId(messageId);
-            messageId = op.chunkedMessageId->build();
+        op.chunkMessageIdList->push_back(messageId);
+        if (op.chunkId == op.numChunks - 1) {
+            auto chunkedMessageId = std::make_shared<ChunkMessageIdImpl>(std::move(*op.chunkMessageIdList));
+            messageId = chunkedMessageId->build();
         }
     }
 
diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc
index 6d54a69..f68dd3d 100644
--- a/tests/MessageChunkingTest.cc
+++ b/tests/MessageChunkingTest.cc
@@ -81,7 +81,9 @@ class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> {
     }
 
     void createConsumer(const std::string& topic, Consumer& consumer) {
-        ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
+        ConsumerConfiguration conf;
+        conf.setBrokerConsumerStatsCacheTimeInMs(1000);
+        ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer));
     }
 
     void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) {
@@ -118,9 +120,6 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
     for (int i = 0; i < numMessages; i++) {
         MessageId messageId;
         ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
-        auto chunkMsgId =
-            std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
-        ASSERT_TRUE(chunkMsgId);
         LOG_INFO("Send " << i << " to " << messageId);
         sendMessageIds.emplace_back(messageId);
     }
@@ -134,19 +133,35 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
         ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
         ASSERT_EQ(msg.getMessageId().batchSize(), 0);
         auto messageId = msg.getMessageId();
-        auto chunkMsgId =
-            std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
-        ASSERT_TRUE(chunkMsgId);
         receivedMessageIds.emplace_back(messageId);
+        consumer.acknowledge(messageId);
     }
     ASSERT_EQ(receivedMessageIds, sendMessageIds);
-    ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId());
+    for (int i = 0; i < sendMessageIds.size(); ++i) {
+        auto sendChunkMsgId =
+            std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(sendMessageIds[i]));
+        ASSERT_TRUE(sendChunkMsgId);
+        auto receiveChunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(
+            PulsarFriend::getMessageIdImpl(receivedMessageIds[i]));
+        ASSERT_TRUE(receiveChunkMsgId);
+        ASSERT_EQ(sendChunkMsgId->getChunkedMessageIds(), receiveChunkMsgId->getChunkedMessageIds());
+    }
     ASSERT_GT(receivedMessageIds.back().entryId(), numMessages);
 
     // Verify the cache has been cleared
     auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
     ASSERT_EQ(chunkedMessageCache.size(), 0);
 
+    BrokerConsumerStats consumerStats;
+    waitUntil(
+        std::chrono::seconds(10),
+        [&] {
+            return consumer.getBrokerConsumerStats(consumerStats) == ResultOk &&
+                   consumerStats.getMsgBacklog() == 0;
+        },
+        1000);
+    ASSERT_EQ(consumerStats.getMsgBacklog(), 0);
+
     producer.close();
     consumer.close();
 }
@@ -317,9 +332,9 @@ TEST_P(MessageChunkingTest, testSeekChunkMessages) {
 TEST(ChunkMessageIdTest, testSetChunkMessageId) {
     MessageId msgId;
     {
-        ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
-        chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build());
-        chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build());
+        ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(
+            std::vector<MessageId>({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(),
+                                    MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()}));
         msgId = chunkMsgId->build();
         // Test the destructor of the underlying message id should also work for the generated messageId.
     }
@@ -332,13 +347,13 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) {
     ASSERT_EQ(deserializedMsgId.entryId(), 5);
     ASSERT_EQ(deserializedMsgId.partition(), 6);
 
-    auto chunkMsgId =
+    const auto& chunkMsgId =
         std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(deserializedMsgId));
     ASSERT_TRUE(chunkMsgId);
-    auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId();
-    ASSERT_EQ(firstChunkMsgId->ledgerId_, 1);
-    ASSERT_EQ(firstChunkMsgId->entryId_, 2);
-    ASSERT_EQ(firstChunkMsgId->partition_, 3);
+    auto firstChunkMsgId = chunkMsgId->getChunkedMessageIds().front();
+    ASSERT_EQ(firstChunkMsgId.ledgerId(), 1);
+    ASSERT_EQ(firstChunkMsgId.entryId(), 2);
+    ASSERT_EQ(firstChunkMsgId.partition(), 3);
 }
 
 // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P