You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/25 23:21:00 UTC

[GitHub] merlimat closed pull request #1322: Use private impl for MessageId in c++ client

merlimat closed pull request #1322: Use private impl for MessageId in c++ client
URL: https://github.com/apache/incubator-pulsar/pull/1322
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/BatchMessageId.h b/pulsar-client-cpp/include/pulsar/BatchMessageId.h
deleted file mode 100644
index b64788682..000000000
--- a/pulsar-client-cpp/include/pulsar/BatchMessageId.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#ifndef LIB_BATCHMESSAGEID_H_
-#define LIB_BATCHMESSAGEID_H_
-
-#include <pulsar/MessageId.h>
-#include <iosfwd>
-
-#pragma GCC visibility push(default)
-
-namespace pulsar {
-
-class PulsarWrapper;
-
-class BatchMessageId : public MessageId {
-   public:
-    BatchMessageId(int64_t ledgerId, int64_t entryId, int batchIndex = -1)
-        : MessageId(ledgerId, entryId), batchIndex_(batchIndex) {}
-
-    BatchMessageId(const MessageId& msgId);
-
-    BatchMessageId() : batchIndex_(-1) {}
-
-    virtual void serialize(std::string& result) const;
-
-    // These functions compare the message order as stored in bookkeeper
-    bool operator<(const BatchMessageId& other) const;
-    bool operator<=(const BatchMessageId& other) const;
-    bool operator==(const BatchMessageId& other) const;
-
-   protected:
-    virtual int64_t getBatchIndex() const;
-
-    friend class Commands;
-    friend class ConsumerImpl;
-    friend class ReaderImpl;
-    friend class Message;
-    friend class MessageImpl;
-    friend class PartitionedProducerImpl;
-    friend class PartitionedConsumerImpl;
-    friend class BatchAcknowledgementTracker;
-    friend class PulsarWrapper;
-    friend class PulsarFriend;
-    int64_t batchIndex_;
-
-    friend std::ostream& operator<<(std::ostream& s, const BatchMessageId& messageId);
-};
-}  // namespace pulsar
-#pragma GCC visibility pop
-
-#endif /* LIB_BATCHMESSAGEID_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index b98b48b84..aff0d9493 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -23,7 +23,8 @@
 #include <string>
 
 #include <boost/shared_ptr.hpp>
-#include "BatchMessageId.h"
+
+#include "MessageId.h"
 
 #pragma GCC visibility push(default)
 
@@ -39,8 +40,6 @@ class MessageBuilder;
 class MessageImpl;
 class PulsarWrapper;
 
-// TODO: When releasing 2.0.0, make all methods virtual and create the virtual destructor for Google Mock
-// tests
 class Message {
    public:
     typedef std::map<std::string, std::string> StringMap;
@@ -128,9 +127,10 @@ class Message {
     MessageImplPtr impl_;
 
     Message(MessageImplPtr& impl);
-    Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload);
+    Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload,
+            int32_t partition);
     /// Used for Batch Messages
-    Message(const BatchMessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
+    Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
             proto::SingleMessageMetadata& singleMetadata);
     friend class PartitionedProducerImpl;
     friend class PartitionedConsumerImpl;
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h
index 40fe5d041..149d177ea 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -27,15 +27,12 @@
 
 namespace pulsar {
 
-class ConsumerImpl;
-class UnAckedMessageTrackerEnabled;
-class PulsarWrapper;
+class MessageIdImpl;
 
 class MessageId {
    public:
     MessageId& operator=(const MessageId&);
     MessageId();
-    virtual ~MessageId() {}
 
     /**
      * MessageId representing the "earliest" or "oldest available" message stored in the topic
@@ -50,34 +47,44 @@ class MessageId {
     /**
      * Serialize the message id into a binary string for storing
      */
-    virtual void serialize(std::string& result) const;
+    void serialize(std::string& result) const;
 
     /**
      * Deserialize a message id from a binary string
      */
-    static boost::shared_ptr<MessageId> deserialize(const std::string& serializedMessageId);
+    static MessageId deserialize(const std::string& serializedMessageId);
 
     // These functions compare the message order as stored in bookkeeper
     bool operator<(const MessageId& other) const;
+    bool operator<=(const MessageId& other) const;
+    bool operator>(const MessageId& other) const;
+    bool operator>=(const MessageId& other) const;
     bool operator==(const MessageId& other) const;
+    bool operator!=(const MessageId& other) const;
 
-   protected:
-    virtual int64_t getBatchIndex() const;
+   private:
     friend class ConsumerImpl;
+    friend class ReaderImpl;
     friend class Message;
     friend class MessageImpl;
     friend class Commands;
-    friend class BatchMessageId;
     friend class PartitionedProducerImpl;
     friend class PartitionedConsumerImpl;
     friend class UnAckedMessageTrackerEnabled;
     friend class BatchAcknowledgementTracker;
     friend class PulsarWrapper;
-    MessageId(int64_t, int64_t);
+    friend class PulsarFriend;
+
+    explicit MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex);
     friend std::ostream& operator<<(std::ostream& s, const MessageId& messageId);
-    int64_t ledgerId_;
-    int64_t entryId_ : 48;
-    short partition_ : 16;
+
+    int64_t ledgerId() const;
+    int64_t entryId() const;
+    int32_t batchIndex() const;
+    int32_t partition() const;
+
+    typedef boost::shared_ptr<MessageIdImpl> MessageIdImplPtr;
+    MessageIdImplPtr impl_;
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
index 868c0501b..0dcf3be95 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
@@ -24,7 +24,7 @@ DECLARE_LOG_OBJECT()
 BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic,
                                                          const std::string subscription,
                                                          const long consumerId)
-    : greatestCumulativeAckSent_(BatchMessageId()) {
+    : greatestCumulativeAckSent_() {
     std::stringstream consumerStrStream;
     consumerStrStream << "BatchAcknowledgementTracker for [" << topic << ", " << subscription << ", "
                       << consumerId << "] ";
@@ -43,7 +43,7 @@ void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
         return;
     }
     Lock lock(mutex_);
-    BatchMessageId msgID = message.impl_->messageId;
+    MessageId msgID = message.impl_->messageId;
 
     // ignore message if it is less than the last cumulative ack sent or messageID is already being tracked
     TrackerMap::iterator pos = trackerMap_.find(msgID);
@@ -60,10 +60,10 @@ void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
         TrackerPair(msgID, boost::dynamic_bitset<>(message.impl_->metadata.num_messages_in_batch()).set()));
 }
 
-void BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& messageId,
+void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
                                                      proto::CommandAck_AckType ackType) {
     // Not a batch message and a individual ack
-    if (messageId.batchIndex_ == -1 && ackType == proto::CommandAck_AckType_Individual) {
+    if (messageId.batchIndex() == -1 && ackType == proto::CommandAck_AckType_Individual) {
         return;
     }
 
@@ -104,18 +104,22 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& messa
     }
 }
 
-bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID,
+bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID,
                                                const proto::CommandAck_AckType ackType) {
     Lock lock(mutex_);
-    TrackerMap::iterator pos = trackerMap_.find(msgID);
-    if (pos == trackerMap_.end() || std::find(sendList_.begin(), sendList_.end(), msgID) != sendList_.end()) {
+    // Remove batch index
+    MessageId batchMessageId = MessageId(msgID.partition(), msgID.ledgerId(), msgID.entryId(),
+                                         -1 /* Batch index */);
+
+    TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
+    if (pos == trackerMap_.end() || std::find(sendList_.begin(), sendList_.end(), batchMessageId) != sendList_.end()) {
         LOG_DEBUG(
             "Batch is ready since message present in sendList_ or not present in trackerMap_ [message ID = "
-            << msgID << "]");
+            << batchMessageId << "]");
         return true;
     }
 
-    int batchIndex = msgID.batchIndex_;
+    int batchIndex = msgID.batchIndex();
     assert(batchIndex < pos->second.size());
     pos->second.set(batchIndex, false);
 
@@ -128,7 +132,7 @@ bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID,
     if (pos->second.any()) {
         return false;
     }
-    sendList_.push_back(msgID);
+    sendList_.push_back(batchMessageId);
     trackerMap_.erase(pos);
     LOG_DEBUG("Batch is ready since message all bits are reset in trackerMap_ [message ID = " << msgID
                                                                                               << "]");
@@ -138,22 +142,24 @@ bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID,
 // returns
 // - a batch message id < messageId
 // - same messageId if it is the last message in the batch
-const BatchMessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(
-    const BatchMessageId& messageId) {
+const MessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const MessageId& messageId) {
     Lock lock(mutex_);
-    BatchMessageId messageReadyForCumulativeAck = BatchMessageId();
-    TrackerMap::iterator pos = trackerMap_.find(messageId);
+
+    // Remove batch index
+    MessageId batchMessageId = MessageId(messageId.partition(), messageId.ledgerId(),
+                                         messageId.entryId(), -1 /* Batch index */);
+    TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
 
     // element not found
     if (pos == trackerMap_.end()) {
-        return BatchMessageId();
+        return MessageId();
     }
 
-    if (pos->second.size() - 1 != messageId.batchIndex_) {
+    if (pos->second.size() - 1 != messageId.batchIndex()) {
         // Can't cumulatively ack this batch message
         if (pos == trackerMap_.begin()) {
             // This was the first message hence we can't decrement the iterator
-            return BatchMessageId();
+            return MessageId();
         }
         pos--;
     }
diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
index b6f8ba8db..6b20c0002 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
@@ -19,7 +19,6 @@
 #ifndef LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
 #define LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
 
-#include "pulsar/BatchMessageId.h"
 #include "MessageImpl.h"
 #include <map>
 #include <boost/thread/mutex.hpp>
@@ -36,8 +35,8 @@ class ConsumerImpl;
 class BatchAcknowledgementTracker {
    private:
     typedef boost::unique_lock<boost::mutex> Lock;
-    typedef std::pair<BatchMessageId, boost::dynamic_bitset<> > TrackerPair;
-    typedef std::map<BatchMessageId, boost::dynamic_bitset<> > TrackerMap;
+    typedef std::pair<MessageId, boost::dynamic_bitset<> > TrackerPair;
+    typedef std::map<MessageId, boost::dynamic_bitset<> > TrackerMap;
     boost::mutex mutex_;
 
     TrackerMap trackerMap_;
@@ -48,20 +47,20 @@ class BatchAcknowledgementTracker {
     // batch index
     // is acked again, we just check the sendList to verify that the batch is acked w/o iterating over the
     // dynamic_bitset.
-    std::vector<BatchMessageId> sendList_;
+    std::vector<MessageId> sendList_;
 
     // we don't need to track MessageId < greatestCumulativeAckReceived
-    BatchMessageId greatestCumulativeAckSent_;
+    MessageId greatestCumulativeAckSent_;
     std::string name_;
 
    public:
     BatchAcknowledgementTracker(const std::string topic, const std::string subscription,
                                 const long consumerId);
 
-    bool isBatchReady(const BatchMessageId& msgID, const proto::CommandAck_AckType ackType);
-    const BatchMessageId getGreatestCumulativeAckReady(const BatchMessageId& messageId);
+    bool isBatchReady(const MessageId& msgID, const proto::CommandAck_AckType ackType);
+    const MessageId getGreatestCumulativeAckReady(const MessageId& messageId);
 
-    void deleteAckedMessage(const BatchMessageId& messageId, proto::CommandAck_AckType ackType);
+    void deleteAckedMessage(const MessageId& messageId, proto::CommandAck_AckType ackType);
     void receivedMessage(const Message& message);
 
     void clear();
@@ -72,23 +71,23 @@ class BatchAcknowledgementTracker {
     // Used for Cumulative acks only
     struct SendRemoveCriteria {
        private:
-        const BatchMessageId& messageId_;
+        const MessageId& messageId_;
 
        public:
-        SendRemoveCriteria(const BatchMessageId& messageId) : messageId_(messageId) {}
+        SendRemoveCriteria(const MessageId& messageId) : messageId_(messageId) {}
 
-        bool operator()(const BatchMessageId& element) const { return (element <= messageId_); }
+        bool operator()(const MessageId& element) const { return (element <= messageId_); }
     };
 
     // Used for Cumulative acks only
     struct TrackerMapRemoveCriteria {
        private:
-        const BatchMessageId& messageId_;
+        const MessageId& messageId_;
 
        public:
-        TrackerMapRemoveCriteria(const BatchMessageId& messageId) : messageId_(messageId) {}
+        TrackerMapRemoveCriteria(const MessageId& messageId) : messageId_(messageId) {}
 
-        bool operator()(std::pair<const pulsar::BatchMessageId, boost::dynamic_bitset<> >& element) const {
+        bool operator()(std::pair<const MessageId, boost::dynamic_bitset<> >& element) const {
             return (element.first <= messageId_);
         }
     };
diff --git a/pulsar-client-cpp/lib/BatchMessageId.cc b/pulsar-client-cpp/lib/BatchMessageId.cc
deleted file mode 100644
index 27d7bbd99..000000000
--- a/pulsar-client-cpp/lib/BatchMessageId.cc
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <pulsar/BatchMessageId.h>
-
-#include "PulsarApi.pb.h"
-
-#include <tuple>
-#include <iostream>
-
-namespace pulsar {
-
-BatchMessageId::BatchMessageId(const MessageId& msgId)
-    : MessageId(msgId.ledgerId_, msgId.entryId_), batchIndex_(msgId.getBatchIndex()) {}
-
-void BatchMessageId::serialize(std::string& result) const {
-    proto::MessageIdData idData;
-    idData.set_ledgerid(ledgerId_);
-    idData.set_entryid(entryId_);
-    idData.set_batch_index(batchIndex_);
-
-    if (partition_ != -1) {
-        idData.set_partition(partition_);
-    }
-
-    idData.SerializeToString(&result);
-}
-
-int64_t BatchMessageId::getBatchIndex() const { return batchIndex_; }
-
-#pragma GCC visibility push(default)
-
-bool BatchMessageId::operator<(const BatchMessageId& other) const {
-    if (ledgerId_ < other.ledgerId_) {
-        return true;
-    } else if (ledgerId_ > other.ledgerId_) {
-        return false;
-    }
-
-    if (entryId_ < other.entryId_) {
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool BatchMessageId::operator<=(const BatchMessageId& other) const { return *this < other || *this == other; }
-
-bool BatchMessageId::operator==(const BatchMessageId& other) const {
-    return ledgerId_ == other.ledgerId_ && entryId_ == other.entryId_ && batchIndex_ == other.batchIndex_;
-}
-
-std::ostream& operator<<(std::ostream& s, const BatchMessageId& messageId) {
-    s << '(' << messageId.ledgerId_ << ':' << messageId.entryId_ << ':' << messageId.batchIndex_ << ':'
-      << messageId.partition_ << ')';
-    return s;
-}
-
-#pragma GCC visibility pop
-}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 68d366a01..1390336cd 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -161,14 +161,14 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st
         }
     }
 
-    BatchMessageId msgId(startMessageId);
+    MessageId msgId(startMessageId);
     lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
         boost::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), _1, _2, topicName, msgId,
                     conf, callback));
 }
 
 void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDataResultPtr partitionMetadata,
-                                            TopicNamePtr topicName, BatchMessageId startMessageId,
+                                            TopicNamePtr topicName, MessageId startMessageId,
                                             ReaderConfiguration conf, ReaderCallback callback) {
     if (result != ResultOk) {
         LOG_ERROR("Error Checking/Getting Partition Metadata while creating reader: " << result);
diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h
index 105df8a8b..5283b5872 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -91,7 +91,7 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> {
                          SubscribeCallback callback);
 
     void handleReaderMetadataLookup(const Result result, const LookupDataResultPtr partitionMetadata,
-                                    TopicNamePtr topicName, BatchMessageId startMessageId,
+                                    TopicNamePtr topicName, MessageId startMessageId,
                                     ReaderConfiguration conf, ReaderCallback callback);
 
     void handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerWeakPtr,
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index e0245c555..82e7abb1b 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -185,7 +185,7 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const
 SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
                                     uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
                                     const std::string& consumerName, SubscriptionMode subscriptionMode,
-                                    Optional<BatchMessageId> startMessageId) {
+                                    Optional<MessageId> startMessageId) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::SUBSCRIBE);
     CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -198,11 +198,11 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
     subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
     if (startMessageId.is_present()) {
         MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
-        messageIdData.set_ledgerid(startMessageId.value().ledgerId_);
-        messageIdData.set_entryid(startMessageId.value().entryId_);
+        messageIdData.set_ledgerid(startMessageId.value().ledgerId());
+        messageIdData.set_entryid(startMessageId.value().entryId());
 
-        if (startMessageId.value().batchIndex_ != -1) {
-            messageIdData.set_batch_index(startMessageId.value().batchIndex_);
+        if (startMessageId.value().batchIndex() != -1) {
+            messageIdData.set_batch_index(startMessageId.value().batchIndex());
         }
     }
 
@@ -442,7 +442,7 @@ void Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, Shar
     batchPayLoad.write(msg.impl_->payload.data(), payloadSize);
 }
 
-Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage) {
+Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex) {
     SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload;
 
     // Format of batch message
@@ -459,7 +459,9 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage) {
     SharedBuffer payload = uncompressedPayload.slice(0, payloadSize);
     uncompressedPayload.consume(payloadSize);
 
-    Message singleMessage(batchedMessage.impl_->messageId, batchedMessage.impl_->metadata, payload, metadata);
+    const MessageId& m = batchedMessage.impl_->messageId;
+    MessageId singleMessageId(m.partition(), m.ledgerId(), m.entryId(), batchIndex);
+    Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata);
     singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
 
     return singleMessage;
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index bda48d26b..3746dc5a2 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -77,8 +77,7 @@ class Commands {
     static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
                                      uint64_t consumerId, uint64_t requestId,
                                      proto::CommandSubscribe_SubType subType, const std::string& consumerName,
-                                     SubscriptionMode subscriptionMode,
-                                     Optional<BatchMessageId> startMessageId);
+                                     SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId);
 
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
 
@@ -106,7 +105,7 @@ class Commands {
     static void serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad,
                                                          const unsigned long& maxMessageSizeInBytes);
 
-    static Message deSerializeSingleMessageInBatch(Message& batchedMessage);
+    static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex);
 
     static SharedBuffer newConsumerStats(uint64_t consumerId, uint64_t requestId);
 
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 55e4e6cec..2abbdd05d 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -36,8 +36,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
                            const std::string& subscription, const ConsumerConfiguration& conf,
                            const ExecutorServicePtr listenerExecutor /* = NULL by default */,
                            const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
-                           Commands::SubscriptionMode subscriptionMode,
-                           Optional<BatchMessageId> startMessageId)
+                           Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
     : HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))),
       waitingForZeroQueueSizeMessage(false),
       config_(conf),
@@ -122,7 +121,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
         return;
     }
 
-    Optional<BatchMessageId> firstMessageInQueue = clearReceiveQueue();
+    Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
     unAckedMessageTrackerPtr_->clear();
     batchAcknowledgementTracker_.clear();
 
@@ -271,8 +270,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
         return;
     }
 
-    Message m(msg, metadata, payload);
-    m.impl_->messageId.partition_ = partitionIndex_;
+    Message m(msg, metadata, payload, partitionIndex_);
     m.impl_->cnx_ = cnx.get();
 
     LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
@@ -320,18 +318,17 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
     int skippedMessages = 0;
 
     for (int i = 0; i < batchSize; i++) {
-        batchedMessage.impl_->messageId.batchIndex_ = i;
         // This is a cheap copy since message contains only one shared pointer (impl_)
-        Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage);
+        Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
 
         if (startMessageId_.is_present()) {
-            const BatchMessageId& msgId = static_cast<const BatchMessageId&>(msg.getMessageId());
+            const MessageId& msgId = msg.getMessageId();
 
             // If we are receiving a batch message, we need to discard messages that were prior
             // to the startMessageId
-            if (msgId.ledgerId_ == startMessageId_.value().ledgerId_ &&
-                msgId.entryId_ == startMessageId_.value().entryId_ &&
-                msgId.batchIndex_ <= startMessageId_.value().batchIndex_) {
+            if (msgId.ledgerId() == startMessageId_.value().ledgerId() &&
+                msgId.entryId() == startMessageId_.value().entryId() &&
+                msgId.batchIndex() <= startMessageId_.value().batchIndex()) {
                 LOG_DEBUG(getName() << "Ignoring message from before the startMessageId"
                                     << msg.getMessageId());
                 ++skippedMessages;
@@ -559,7 +556,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
 
 void ConsumerImpl::messageProcessed(Message& msg) {
     Lock lock(mutex_);
-    lastDequedMessage_ = Optional<BatchMessageId>::of(static_cast<const BatchMessageId&>(msg.getMessageId()));
+    lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
 
     ClientConnectionPtr currentCnx = getCnx().lock();
     if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
@@ -575,23 +572,19 @@ void ConsumerImpl::messageProcessed(Message& msg) {
  * was
  * not seen by the application
  */
-Optional<BatchMessageId> ConsumerImpl::clearReceiveQueue() {
+Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
     Message nextMessageInQueue;
     if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
         // There was at least one message pending in the queue
-        // We can safely cast to 'BatchMessageId' since all the messages queued will have that type of message
-        // id,
-        // irrespective of whether they were part of a batch or not.
-        const BatchMessageId& nextMessageId =
-            static_cast<const BatchMessageId&>(nextMessageInQueue.getMessageId());
-        BatchMessageId previousMessageId;
-        if (nextMessageId.batchIndex_ >= 0) {
-            previousMessageId = BatchMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_,
-                                               nextMessageId.batchIndex_ - 1);
+        const MessageId& nextMessageId = nextMessageInQueue.getMessageId();
+        MessageId previousMessageId;
+        if (nextMessageId.batchIndex() >= 0) {
+            previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId(),
+                                          nextMessageId.batchIndex() - 1);
         } else {
-            previousMessageId = BatchMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_ - 1, -1);
+            previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId() - 1, -1);
         }
-        return Optional<BatchMessageId>::of(previousMessageId);
+        return Optional<MessageId>::of(previousMessageId);
     } else if (lastDequedMessage_.is_present()) {
         // If the queue was empty we need to restart from the message just after the last one that has been
         // dequeued
@@ -646,23 +639,21 @@ void ConsumerImpl::statsCallback(Result res, ResultCallback callback, proto::Com
 void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
     ResultCallback cb =
         boost::bind(&ConsumerImpl::statsCallback, this, _1, callback, proto::CommandAck_AckType_Individual);
-    const BatchMessageId& batchMsgId = (const BatchMessageId&)msgId;
-    if (batchMsgId.batchIndex_ != -1 &&
-        !batchAcknowledgementTracker_.isBatchReady(batchMsgId, proto::CommandAck_AckType_Individual)) {
+    if (msgId.batchIndex() != -1 &&
+        !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Individual)) {
         cb(ResultOk);
         return;
     }
-    doAcknowledge(batchMsgId, proto::CommandAck_AckType_Individual, cb);
+    doAcknowledge(msgId, proto::CommandAck_AckType_Individual, cb);
 }
 
-void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& mId, ResultCallback callback) {
+void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
     ResultCallback cb =
         boost::bind(&ConsumerImpl::statsCallback, this, _1, callback, proto::CommandAck_AckType_Cumulative);
-    const BatchMessageId& msgId = (const BatchMessageId&)mId;
-    if (msgId.batchIndex_ != -1 &&
+    if (msgId.batchIndex() != -1 &&
         !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Cumulative)) {
-        BatchMessageId messageId = batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId);
-        if (messageId == BatchMessageId()) {
+        MessageId messageId = batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId);
+        if (messageId == MessageId()) {
             // nothing to ack
             cb(ResultOk);
         } else {
@@ -673,11 +664,11 @@ void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& mId, ResultCallba
     }
 }
 
-void ConsumerImpl::doAcknowledge(const BatchMessageId& messageId, proto::CommandAck_AckType ackType,
+void ConsumerImpl::doAcknowledge(const MessageId& messageId, proto::CommandAck_AckType ackType,
                                  ResultCallback callback) {
     proto::MessageIdData messageIdData;
-    messageIdData.set_ledgerid(messageId.ledgerId_);
-    messageIdData.set_entryid(messageId.entryId_);
+    messageIdData.set_ledgerid(messageId.ledgerId());
+    messageIdData.set_entryid(messageId.entryId());
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         SharedBuffer cmd = Commands::newAck(consumerId_, messageIdData, ackType, -1);
@@ -687,7 +678,7 @@ void ConsumerImpl::doAcknowledge(const BatchMessageId& messageId, proto::Command
         } else {
             unAckedMessageTrackerPtr_->removeMessagesTill(messageId);
         }
-        batchAcknowledgementTracker_.deleteAckedMessage((BatchMessageId&)messageId, ackType);
+        batchAcknowledgementTracker_.deleteAckedMessage(messageId, ackType);
         callback(ResultOk);
         LOG_DEBUG(getName() << "ack request sent for message - [" << messageIdData.ledgerid() << ","
                             << messageIdData.entryid() << "]");
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index d115ed83e..45a12efbe 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -20,7 +20,7 @@
 #define LIB_CONSUMERIMPL_H_
 
 #include <string>
-#include "pulsar/BatchMessageId.h"
+
 #include "pulsar/Result.h"
 #include "UnboundedBlockingQueue.h"
 #include "HandlerBase.h"
@@ -69,7 +69,7 @@ class ConsumerImpl : public ConsumerImplBase,
                  const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(),
                  const ConsumerTopicType consumerTopicType = NonPartitioned,
                  Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
-                 Optional<BatchMessageId> startMessageId = Optional<BatchMessageId>::empty());
+                 Optional<MessageId> startMessageId = Optional<MessageId>::empty());
     ~ConsumerImpl();
     void setPartitionIndex(int partitionIndex);
     int getPartitionIndex();
@@ -82,7 +82,7 @@ class ConsumerImpl : public ConsumerImplBase,
     inline proto::CommandSubscribe_SubType getSubType();
     void unsubscribeAsync(ResultCallback callback);
     void handleUnsubscribe(Result result, ResultCallback callback);
-    void doAcknowledge(const BatchMessageId& messageId, proto::CommandAck_AckType ackType,
+    void doAcknowledge(const MessageId& messageId, proto::CommandAck_AckType ackType,
                        ResultCallback callback);
     virtual void disconnectConsumer();
     virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture();
@@ -134,7 +134,7 @@ class ConsumerImpl : public ConsumerImplBase,
     Result receiveHelper(Message& msg, int timeout);
     void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
 
-    Optional<BatchMessageId> clearReceiveQueue();
+    Optional<MessageId> clearReceiveQueue();
 
     boost::mutex mutexForReceiveWithZeroQueueSize;
     const ConsumerConfiguration config_;
@@ -145,15 +145,15 @@ class ConsumerImpl : public ConsumerImplBase,
     ConsumerTopicType consumerTopicType_;
 
     Commands::SubscriptionMode subscriptionMode_;
-    Optional<BatchMessageId> startMessageId_;
+    Optional<MessageId> startMessageId_;
 
-    Optional<BatchMessageId> lastDequedMessage_;
+    Optional<MessageId> lastDequedMessage_;
     UnboundedBlockingQueue<Message> incomingMessages_;
     int availablePermits_;
     uint64_t consumerId_;
     std::string consumerName_;
     std::string consumerStr_;
-    short partitionIndex_;
+    int32_t partitionIndex_;
     Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
     bool messageListenerRunning_;
     boost::mutex messageListenerMutex_;
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 79d7ca463..6b46fd077 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -34,7 +34,7 @@ using namespace pulsar;
 namespace pulsar {
 
 const static std::string emptyString;
-const static BatchMessageId invalidMessageId;
+const static MessageId invalidMessageId;
 
 const Message::StringMap& Message::getProperties() const { return impl_->properties(); }
 
@@ -62,14 +62,17 @@ Message::Message() : impl_() {}
 
 Message::Message(MessageImplPtr& impl) : impl_(impl) {}
 
-Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload)
+Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload,
+                 int32_t partition)
     : impl_(boost::make_shared<MessageImpl>()) {
-    impl_->messageId = BatchMessageId(msg.message_id().ledgerid(), msg.message_id().entryid());
+    impl_->messageId =
+        MessageId(partition, msg.message_id().ledgerid(), msg.message_id().entryid(), /* batchId */
+                  -1);
     impl_->metadata = metadata;
     impl_->payload = payload;
 }
 
-Message::Message(const BatchMessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
+Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
                  proto::SingleMessageMetadata& singleMetadata)
     : impl_(boost::make_shared<MessageImpl>()) {
     impl_->messageId = messageID;
diff --git a/pulsar-client-cpp/lib/MessageId.cc b/pulsar-client-cpp/lib/MessageId.cc
index dc1ff38ed..c5314d89a 100644
--- a/pulsar-client-cpp/lib/MessageId.cc
+++ b/pulsar-client-cpp/lib/MessageId.cc
@@ -18,9 +18,9 @@
  */
 
 #include <pulsar/MessageId.h>
-#include <pulsar/BatchMessageId.h>
 
 #include "PulsarApi.pb.h"
+#include "MessageIdImpl.h"
 
 #include <iostream>
 #include <limits>
@@ -30,43 +30,40 @@
 
 namespace pulsar {
 
-MessageId::MessageId() : ledgerId_(-1), entryId_(-1), partition_(-1) {}
+MessageId::MessageId() {
+    static const MessageIdImplPtr emptyMessageId = boost::make_shared<MessageIdImpl>();
+    impl_ = emptyMessageId;
+}
 
 MessageId& MessageId::operator=(const MessageId& m) {
-    entryId_ = m.entryId_;
-    ledgerId_ = m.ledgerId_;
-    partition_ = m.partition_;
+    impl_ = m.impl_;
     return *this;
 }
 
-MessageId::MessageId(int64_t ledgerId, int64_t entryId)
-    : ledgerId_(ledgerId), entryId_(entryId), partition_(-1) {
-    // partition is set explicitly in consumerImpl when message is received
-    // consumer's partition is assigned to this partition
-}
-
-int64_t MessageId::getBatchIndex() const {
-    // It's only relevant for batch message ids
-    return -1;
-}
+MessageId::MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex)
+    : impl_(boost::make_shared<MessageIdImpl>(partition, ledgerId, entryId, batchIndex)) {}
 
 const MessageId& MessageId::earliest() {
-    static const BatchMessageId _earliest(-1, -1);
+    static const MessageId _earliest(-1, -1, -1, -1);
     return _earliest;
 }
 
 const MessageId& MessageId::latest() {
-    // For entry-id we only have 48bits
-    static const BatchMessageId _latest(std::numeric_limits<int64_t>::max(), (int64_t)(pow(2, 47) - 1));
+    static const int64_t long_max = std::numeric_limits<int64_t>::max();
+    static const MessageId _latest(-1, long_max, long_max, -1);
     return _latest;
 }
 
 void MessageId::serialize(std::string& result) const {
     proto::MessageIdData idData;
-    idData.set_ledgerid(ledgerId_);
-    idData.set_entryid(entryId_);
-    if (partition_ != -1) {
-        idData.set_partition(partition_);
+    idData.set_ledgerid(impl_->ledgerId_);
+    idData.set_entryid(impl_->entryId_);
+    if (impl_->partition_ != -1) {
+        idData.set_partition(impl_->partition_);
+    }
+
+    if (impl_->batchIndex_ != -1) {
+        idData.set_batch_index(impl_->batchIndex_);
     }
 
     idData.SerializeToString(&result);
@@ -75,38 +72,63 @@ void MessageId::serialize(std::string& result) const {
 /**
  * Deserialize a message id from a binary string
  */
-boost::shared_ptr<MessageId> MessageId::deserialize(const std::string& serializedMessageId) {
+MessageId MessageId::deserialize(const std::string& serializedMessageId) {
     proto::MessageIdData idData;
     if (!idData.ParseFromString(serializedMessageId)) {
         throw "Failed to parse serialized message id";
     }
 
-    return boost::make_shared<BatchMessageId>(idData.ledgerid(), idData.entryid(), idData.batch_index());
+    return MessageId(idData.partition(), idData.ledgerid(), idData.entryid(), idData.batch_index());
 }
 
+int64_t MessageId::ledgerId() const { return impl_->ledgerId_; }
+
+int64_t MessageId::entryId() const { return impl_->entryId_; }
+
+int32_t MessageId::batchIndex() const { return impl_->batchIndex_; }
+
+int32_t MessageId::partition() const { return impl_->partition_; }
+
 #pragma GCC visibility push(default)
+
 std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
-    s << '(' << messageId.ledgerId_ << ',' << messageId.entryId_ << ',' << messageId.partition_ << ')';
+    s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
+      << messageId.impl_->batchIndex_ << ',' << messageId.impl_->partition_ << ')';
     return s;
 }
 
 bool MessageId::operator<(const MessageId& other) const {
-    if (ledgerId_ < other.ledgerId_) {
+    if (impl_->ledgerId_ < other.impl_->ledgerId_) {
+        return true;
+    } else if (impl_->ledgerId_ > other.impl_->ledgerId_) {
+        return false;
+    }
+
+    if (impl_->entryId_ < other.impl_->entryId_) {
         return true;
-    } else if (ledgerId_ > other.ledgerId_) {
+    } else if (impl_->entryId_ > other.impl_->entryId_) {
         return false;
     }
 
-    if (entryId_ < other.entryId_) {
+    if (impl_->batchIndex_ < other.impl_->batchIndex_) {
         return true;
     } else {
         return false;
     }
 }
 
+bool MessageId::operator<=(const MessageId& other) const { return *this < other || *this == other; }
+
+bool MessageId::operator>(const MessageId& other) const { return !(*this <= other); }
+
+bool MessageId::operator>=(const MessageId& other) const { return !(*this < other); }
+
 bool MessageId::operator==(const MessageId& other) const {
-    return ledgerId_ == other.ledgerId_ && entryId_ == other.entryId_;
+    return impl_->ledgerId_ == other.impl_->ledgerId_ && impl_->entryId_ == other.impl_->entryId_ &&
+           impl_->batchIndex_ == other.impl_->batchIndex_ && impl_->partition_ == other.impl_->partition_;
 }
 
+bool MessageId::operator!=(const MessageId& other) const { return !(*this == other); }
+
 #pragma GCC visibility pop
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageIdImpl.h b/pulsar-client-cpp/lib/MessageIdImpl.h
new file mode 100644
index 000000000..a3fc1714a
--- /dev/null
+++ b/pulsar-client-cpp/lib/MessageIdImpl.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+
+namespace pulsar {
+
+class MessageIdImpl {
+   public:
+    MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), batchIndex_(-1) {}
+    MessageIdImpl(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex)
+        : ledgerId_(ledgerId), entryId_(entryId), partition_(partition), batchIndex_(batchIndex) {}
+    const int64_t ledgerId_;
+    const int64_t entryId_;
+    const int32_t partition_;
+    const int32_t batchIndex_;
+};
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h
index 23a8fcf51..f3753b1cf 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -21,7 +21,6 @@
 
 #include <pulsar/Message.h>
 #include <pulsar/MessageId.h>
-#include "pulsar/BatchMessageId.h"
 #include "SharedBuffer.h"
 #include "PulsarApi.pb.h"
 
@@ -42,7 +41,7 @@ class MessageImpl {
 
     proto::MessageMetadata metadata;
     SharedBuffer payload;
-    BatchMessageId messageId;
+    MessageId messageId;
     ClientConnection* cnx_;
 
     const std::string& getPartitionKey() const;
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index f9c02b39e..dc218efe1 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -150,7 +150,7 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int
 }
 
 void PartitionedConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
-    int partition = msgId.partition_;
+    int32_t partition = msgId.partition();
     assert(partition < numPartitions_ && partition >= 0 && consumers_.size() > partition);
     unAckedMessageTrackerPtr_->remove(msgId);
     consumers_[partition]->acknowledgeAsync(msgId, callback);
@@ -313,7 +313,7 @@ bool PartitionedConsumerImpl::isOpen() {
 }
 
 void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
-    LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition_);
+    LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition());
     messages_.push(msg);
     if (messageListener_) {
         listenerExecutor_->postWork(
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 728573a4d..6808aad2d 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -130,7 +130,6 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
     }
     // find a producer for that partition, index should start from 0
     ProducerImplPtr& producer = producers_[partition];
-    msg.impl_->messageId.partition_ = partition;
     // send message on that partition
     producer->sendAsync(msg, callback);
 }
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 308908ddb..5e69a72fc 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -28,7 +28,7 @@ ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, con
                        const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback)
     : topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {}
 
-void ReaderImpl::start(const BatchMessageId& startMessageId) {
+void ReaderImpl::start(const MessageId& startMessageId) {
     ConsumerConfiguration consumerConf;
     consumerConf.setConsumerType(ConsumerExclusive);
     consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
@@ -51,7 +51,7 @@ void ReaderImpl::start(const BatchMessageId& startMessageId) {
 
     consumer_ = boost::make_shared<ConsumerImpl>(
         client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), NonPartitioned,
-        Commands::SubscriptionModeNonDurable, Optional<BatchMessageId>::of(startMessageId));
+        Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
     consumer_->getConsumerCreatedFuture().addListener(
         boost::bind(&ReaderImpl::handleConsumerCreated, shared_from_this(), _1, _2));
     consumer_->start();
@@ -87,13 +87,11 @@ void ReaderImpl::acknowledgeIfNecessary(Result result, const Message& msg) {
         return;
     }
 
-    const BatchMessageId& msgId = static_cast<const BatchMessageId&>(msg.getMessageId());
-
     // Only acknowledge on the first message in the batch
-    if (msgId.batchIndex_ <= 0) {
+    if (msg.getMessageId().batchIndex() <= 0) {
         // Acknowledge message immediately because the reader is based on non-durable
         // subscription. When it reconnects, it will specify the subscription position anyway
-        consumer_->acknowledgeCumulativeAsync(msgId, emptyCallback);
+        consumer_->acknowledgeCumulativeAsync(msg.getMessageId(), emptyCallback);
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index 97c7f6b79..61216b84d 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -36,7 +36,7 @@ class ReaderImpl : public boost::enable_shared_from_this<ReaderImpl> {
     ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
                const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
 
-    void start(const BatchMessageId& startMessageId);
+    void start(const MessageId& startMessageId);
 
     const std::string& getTopic() const;
 
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 593060611..90006b621 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -85,14 +85,14 @@ long UnAckedMessageTrackerEnabled::size() {
 void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
     boost::unique_lock<boost::mutex> acquire(lock_);
     for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
-        if (*it < msgId && it->partition_ == msgId.partition_) {
+        if (*it < msgId && it->partition() == msgId.partition()) {
             oldSet_.erase(it++);
         } else {
             it++;
         }
     }
     for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) {
-        if (*it < msgId && it->partition_ == msgId.partition_) {
+        if (*it < msgId && it->partition() == msgId.partition()) {
             currentSet_.erase(it++);
         } else {
             it++;
diff --git a/pulsar-client-cpp/python/src/client.cc b/pulsar-client-cpp/python/src/client.cc
index ad78ed24a..b5295acf2 100644
--- a/pulsar-client-cpp/python/src/client.cc
+++ b/pulsar-client-cpp/python/src/client.cc
@@ -44,7 +44,7 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s
 }
 
 Reader Client_createReader(Client& client, const std::string& topic,
-                           const BatchMessageId& startMessageId,
+                           const MessageId& startMessageId,
                            const ReaderConfiguration& conf) {
     Reader reader;
     Result res;
diff --git a/pulsar-client-cpp/python/src/message.cc b/pulsar-client-cpp/python/src/message.cc
index b7513d3a9..1d1ee235c 100644
--- a/pulsar-client-cpp/python/src/message.cc
+++ b/pulsar-client-cpp/python/src/message.cc
@@ -20,13 +20,13 @@
 
 #include <boost/python/suite/indexing/map_indexing_suite.hpp>
 
-std::string MessageId_str(const BatchMessageId& msgId) {
+std::string MessageId_str(const MessageId& msgId) {
     std::stringstream ss;
     ss << msgId;
     return ss.str();
 }
 
-std::string MessageId_serialize(const BatchMessageId& msgId) {
+std::string MessageId_serialize(const MessageId& msgId) {
     std::string serialized;
     msgId.serialize(serialized);
     return serialized;
@@ -42,8 +42,8 @@ boost::python::object Message_data(const Message& msg) {
     return boost::python::object(boost::python::handle<>(PyBytes_FromStringAndSize((const char*)msg.getData(), msg.getLength())));
 }
 
-const BatchMessageId& Message_getMessageId(const Message& msg) {
-    return static_cast<const BatchMessageId&>(msg.getMessageId());
+const MessageId& Message_getMessageId(const Message& msg) {
+    return msg.getMessageId();
 }
 
 void export_message() {
@@ -67,10 +67,10 @@ void export_message() {
             .def(map_indexing_suite<Message::StringMap>())
             ;
 
-    static const BatchMessageId& _MessageId_earliest = static_cast<const BatchMessageId&>(MessageId::earliest());
-    static const BatchMessageId& _MessageId_latest = static_cast<const BatchMessageId&>(MessageId::latest());
+    static const MessageId& _MessageId_earliest = MessageId::earliest();
+    static const MessageId& _MessageId_latest = MessageId::latest();
 
-    class_<BatchMessageId, boost::shared_ptr<BatchMessageId> >("MessageId")
+    class_<MessageId>("MessageId")
             .def("__str__", &MessageId_str)
             .add_static_property("earliest", make_getter(&_MessageId_earliest))
             .add_static_property("latest", make_getter(&_MessageId_latest))
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc
index dda8f79ff..34c38db84 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -228,7 +228,7 @@ TEST(BatchMessageTest, testBatchSizeInBytes) {
         std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i);
         LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
                                                        << receivedMsg.getMessageId() << "]");
-        ASSERT_LT(pulsar::PulsarFriend::getBatchIndex((BatchMessageId&)receivedMsg.getMessageId()), 2);
+        ASSERT_LT(pulsar::PulsarFriend::getBatchIndex(receivedMsg.getMessageId()), 2);
         ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++));
         ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
         ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc b/pulsar-client-cpp/tests/MessageIdTest.cc
index 7402143eb..06c252837 100644
--- a/pulsar-client-cpp/tests/MessageIdTest.cc
+++ b/pulsar-client-cpp/tests/MessageIdTest.cc
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/BatchMessageId.h>
+#include <pulsar/MessageId.h>
+#include "PulsarFriend.h"
 
 #include <gtest/gtest.h>
 
@@ -25,12 +26,12 @@
 using namespace pulsar;
 
 TEST(MessageIdTest, testSerialization) {
-    BatchMessageId msgId(1, 2, 3);
+    MessageId msgId = PulsarFriend::getMessageId(-1, 1, 2, 3);
 
     std::string serialized;
     msgId.serialize(serialized);
 
-    boost::shared_ptr<MessageId> deserialized = MessageId::deserialize(serialized);
+    MessageId deserialized = MessageId::deserialize(serialized);
 
-    ASSERT_EQ(msgId, static_cast<const BatchMessageId&>(*deserialized));
+    ASSERT_EQ(msgId, deserialized);
 }
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index c64bab2d8..f2d1a699c 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/BatchMessageId.h>
+
 #include <lib/ProducerImpl.h>
 #include <lib/ConsumerImpl.h>
 #include <string>
@@ -26,7 +26,11 @@ using std::string;
 namespace pulsar {
 class PulsarFriend {
    public:
-    static int getBatchIndex(const BatchMessageId& mId) { return mId.batchIndex_; }
+    static MessageId getMessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex) {
+        return MessageId(partition, ledgerId, entryId, batchIndex);
+    }
+
+    static int getBatchIndex(const MessageId& mId) { return mId.batchIndex(); }
 
     static ProducerStatsImplPtr getProducerStatsPtr(Producer producer) {
         ProducerImpl* producerImpl = static_cast<ProducerImpl*>(producer.impl_.get());
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index 0e31f6a14..be9678daa 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -257,7 +257,7 @@ TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) {
     // Create another reader starting on msgid4
     auto msgId4 = MessageId::deserialize(lastMessageId);
     Reader reader2;
-    ASSERT_EQ(ResultOk, client.createReader(topicName, *msgId4, readerConf, reader2));
+    ASSERT_EQ(ResultOk, client.createReader(topicName, msgId4, readerConf, reader2));
 
     for (int i = 5; i < 11; i++) {
         Message msg;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services