You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/25 23:21:01 UTC
[incubator-pulsar] branch master updated: Use private impl for
MessageId in c++ client (#1322)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5efafc5 Use private impl for MessageId in c++ client (#1322)
5efafc5 is described below
commit 5efafc57fd81eaf622fe65548facfec00af5b49b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Mar 25 16:20:58 2018 -0700
Use private impl for MessageId in c++ client (#1322)
---
pulsar-client-cpp/include/pulsar/BatchMessageId.h | 67 ------------------
pulsar-client-cpp/include/pulsar/Message.h | 10 +--
pulsar-client-cpp/include/pulsar/MessageId.h | 33 +++++----
.../lib/BatchAcknowledgementTracker.cc | 40 ++++++-----
.../lib/BatchAcknowledgementTracker.h | 27 ++++----
pulsar-client-cpp/lib/BatchMessageId.cc | 76 --------------------
pulsar-client-cpp/lib/ClientImpl.cc | 4 +-
pulsar-client-cpp/lib/ClientImpl.h | 2 +-
pulsar-client-cpp/lib/Commands.cc | 16 +++--
pulsar-client-cpp/lib/Commands.h | 5 +-
pulsar-client-cpp/lib/ConsumerImpl.cc | 65 ++++++++----------
pulsar-client-cpp/lib/ConsumerImpl.h | 14 ++--
pulsar-client-cpp/lib/Message.cc | 11 +--
pulsar-client-cpp/lib/MessageId.cc | 80 ++++++++++++++--------
.../MessageIdTest.cc => lib/MessageIdImpl.h} | 28 ++++----
pulsar-client-cpp/lib/MessageImpl.h | 3 +-
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 4 +-
pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 1 -
pulsar-client-cpp/lib/ReaderImpl.cc | 10 ++-
pulsar-client-cpp/lib/ReaderImpl.h | 2 +-
.../lib/UnAckedMessageTrackerEnabled.cc | 4 +-
pulsar-client-cpp/python/src/client.cc | 2 +-
pulsar-client-cpp/python/src/message.cc | 14 ++--
pulsar-client-cpp/tests/BatchMessageTest.cc | 2 +-
pulsar-client-cpp/tests/MessageIdTest.cc | 9 +--
pulsar-client-cpp/tests/PulsarFriend.h | 8 ++-
pulsar-client-cpp/tests/ReaderTest.cc | 2 +-
27 files changed, 213 insertions(+), 326 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/BatchMessageId.h b/pulsar-client-cpp/include/pulsar/BatchMessageId.h
deleted file mode 100644
index b647886..0000000
--- a/pulsar-client-cpp/include/pulsar/BatchMessageId.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#ifndef LIB_BATCHMESSAGEID_H_
-#define LIB_BATCHMESSAGEID_H_
-
-#include <pulsar/MessageId.h>
-#include <iosfwd>
-
-#pragma GCC visibility push(default)
-
-namespace pulsar {
-
-class PulsarWrapper;
-
-class BatchMessageId : public MessageId {
- public:
- BatchMessageId(int64_t ledgerId, int64_t entryId, int batchIndex = -1)
- : MessageId(ledgerId, entryId), batchIndex_(batchIndex) {}
-
- BatchMessageId(const MessageId& msgId);
-
- BatchMessageId() : batchIndex_(-1) {}
-
- virtual void serialize(std::string& result) const;
-
- // These functions compare the message order as stored in bookkeeper
- bool operator<(const BatchMessageId& other) const;
- bool operator<=(const BatchMessageId& other) const;
- bool operator==(const BatchMessageId& other) const;
-
- protected:
- virtual int64_t getBatchIndex() const;
-
- friend class Commands;
- friend class ConsumerImpl;
- friend class ReaderImpl;
- friend class Message;
- friend class MessageImpl;
- friend class PartitionedProducerImpl;
- friend class PartitionedConsumerImpl;
- friend class BatchAcknowledgementTracker;
- friend class PulsarWrapper;
- friend class PulsarFriend;
- int64_t batchIndex_;
-
- friend std::ostream& operator<<(std::ostream& s, const BatchMessageId& messageId);
-};
-} // namespace pulsar
-#pragma GCC visibility pop
-
-#endif /* LIB_BATCHMESSAGEID_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index b98b48b..aff0d94 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -23,7 +23,8 @@
#include <string>
#include <boost/shared_ptr.hpp>
-#include "BatchMessageId.h"
+
+#include "MessageId.h"
#pragma GCC visibility push(default)
@@ -39,8 +40,6 @@ class MessageBuilder;
class MessageImpl;
class PulsarWrapper;
-// TODO: When releasing 2.0.0, make all methods virtual and create the virtual destructor for Google Mock
-// tests
class Message {
public:
typedef std::map<std::string, std::string> StringMap;
@@ -128,9 +127,10 @@ class Message {
MessageImplPtr impl_;
Message(MessageImplPtr& impl);
- Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload);
+ Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload,
+ int32_t partition);
/// Used for Batch Messages
- Message(const BatchMessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
+ Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
proto::SingleMessageMetadata& singleMetadata);
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h
index 40fe5d0..149d177 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -27,15 +27,12 @@
namespace pulsar {
-class ConsumerImpl;
-class UnAckedMessageTrackerEnabled;
-class PulsarWrapper;
+class MessageIdImpl;
class MessageId {
public:
MessageId& operator=(const MessageId&);
MessageId();
- virtual ~MessageId() {}
/**
* MessageId representing the "earliest" or "oldest available" message stored in the topic
@@ -50,34 +47,44 @@ class MessageId {
/**
* Serialize the message id into a binary string for storing
*/
- virtual void serialize(std::string& result) const;
+ void serialize(std::string& result) const;
/**
* Deserialize a message id from a binary string
*/
- static boost::shared_ptr<MessageId> deserialize(const std::string& serializedMessageId);
+ static MessageId deserialize(const std::string& serializedMessageId);
// These functions compare the message order as stored in bookkeeper
bool operator<(const MessageId& other) const;
+ bool operator<=(const MessageId& other) const;
+ bool operator>(const MessageId& other) const;
+ bool operator>=(const MessageId& other) const;
bool operator==(const MessageId& other) const;
+ bool operator!=(const MessageId& other) const;
- protected:
- virtual int64_t getBatchIndex() const;
+ private:
friend class ConsumerImpl;
+ friend class ReaderImpl;
friend class Message;
friend class MessageImpl;
friend class Commands;
- friend class BatchMessageId;
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
friend class UnAckedMessageTrackerEnabled;
friend class BatchAcknowledgementTracker;
friend class PulsarWrapper;
- MessageId(int64_t, int64_t);
+ friend class PulsarFriend;
+
+ explicit MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex);
friend std::ostream& operator<<(std::ostream& s, const MessageId& messageId);
- int64_t ledgerId_;
- int64_t entryId_ : 48;
- short partition_ : 16;
+
+ int64_t ledgerId() const;
+ int64_t entryId() const;
+ int32_t batchIndex() const;
+ int32_t partition() const;
+
+ typedef boost::shared_ptr<MessageIdImpl> MessageIdImplPtr;
+ MessageIdImplPtr impl_;
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
index 868c050..0dcf3be 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
@@ -24,7 +24,7 @@ DECLARE_LOG_OBJECT()
BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic,
const std::string subscription,
const long consumerId)
- : greatestCumulativeAckSent_(BatchMessageId()) {
+ : greatestCumulativeAckSent_() {
std::stringstream consumerStrStream;
consumerStrStream << "BatchAcknowledgementTracker for [" << topic << ", " << subscription << ", "
<< consumerId << "] ";
@@ -43,7 +43,7 @@ void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
return;
}
Lock lock(mutex_);
- BatchMessageId msgID = message.impl_->messageId;
+ MessageId msgID = message.impl_->messageId;
// ignore message if it is less than the last cumulative ack sent or messageID is already being tracked
TrackerMap::iterator pos = trackerMap_.find(msgID);
@@ -60,10 +60,10 @@ void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
TrackerPair(msgID, boost::dynamic_bitset<>(message.impl_->metadata.num_messages_in_batch()).set()));
}
-void BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& messageId,
+void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
proto::CommandAck_AckType ackType) {
// Not a batch message and a individual ack
- if (messageId.batchIndex_ == -1 && ackType == proto::CommandAck_AckType_Individual) {
+ if (messageId.batchIndex() == -1 && ackType == proto::CommandAck_AckType_Individual) {
return;
}
@@ -104,18 +104,22 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& messa
}
}
-bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID,
+bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID,
const proto::CommandAck_AckType ackType) {
Lock lock(mutex_);
- TrackerMap::iterator pos = trackerMap_.find(msgID);
- if (pos == trackerMap_.end() || std::find(sendList_.begin(), sendList_.end(), msgID) != sendList_.end()) {
+ // Remove batch index
+ MessageId batchMessageId = MessageId(msgID.partition(), msgID.ledgerId(), msgID.entryId(),
+ -1 /* Batch index */);
+
+ TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
+ if (pos == trackerMap_.end() || std::find(sendList_.begin(), sendList_.end(), batchMessageId) != sendList_.end()) {
LOG_DEBUG(
"Batch is ready since message present in sendList_ or not present in trackerMap_ [message ID = "
- << msgID << "]");
+ << batchMessageId << "]");
return true;
}
- int batchIndex = msgID.batchIndex_;
+ int batchIndex = msgID.batchIndex();
assert(batchIndex < pos->second.size());
pos->second.set(batchIndex, false);
@@ -128,7 +132,7 @@ bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID,
if (pos->second.any()) {
return false;
}
- sendList_.push_back(msgID);
+ sendList_.push_back(batchMessageId);
trackerMap_.erase(pos);
LOG_DEBUG("Batch is ready since message all bits are reset in trackerMap_ [message ID = " << msgID
<< "]");
@@ -138,22 +142,24 @@ bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID,
// returns
// - a batch message id < messageId
// - same messageId if it is the last message in the batch
-const BatchMessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(
- const BatchMessageId& messageId) {
+const MessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const MessageId& messageId) {
Lock lock(mutex_);
- BatchMessageId messageReadyForCumulativeAck = BatchMessageId();
- TrackerMap::iterator pos = trackerMap_.find(messageId);
+
+ // Remove batch index
+ MessageId batchMessageId = MessageId(messageId.partition(), messageId.ledgerId(),
+ messageId.entryId(), -1 /* Batch index */);
+ TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
// element not found
if (pos == trackerMap_.end()) {
- return BatchMessageId();
+ return MessageId();
}
- if (pos->second.size() - 1 != messageId.batchIndex_) {
+ if (pos->second.size() - 1 != messageId.batchIndex()) {
// Can't cumulatively ack this batch message
if (pos == trackerMap_.begin()) {
// This was the first message hence we can't decrement the iterator
- return BatchMessageId();
+ return MessageId();
}
pos--;
}
diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
index b6f8ba8..6b20c00 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
@@ -19,7 +19,6 @@
#ifndef LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
#define LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
-#include "pulsar/BatchMessageId.h"
#include "MessageImpl.h"
#include <map>
#include <boost/thread/mutex.hpp>
@@ -36,8 +35,8 @@ class ConsumerImpl;
class BatchAcknowledgementTracker {
private:
typedef boost::unique_lock<boost::mutex> Lock;
- typedef std::pair<BatchMessageId, boost::dynamic_bitset<> > TrackerPair;
- typedef std::map<BatchMessageId, boost::dynamic_bitset<> > TrackerMap;
+ typedef std::pair<MessageId, boost::dynamic_bitset<> > TrackerPair;
+ typedef std::map<MessageId, boost::dynamic_bitset<> > TrackerMap;
boost::mutex mutex_;
TrackerMap trackerMap_;
@@ -48,20 +47,20 @@ class BatchAcknowledgementTracker {
// batch index
// is acked again, we just check the sendList to verify that the batch is acked w/o iterating over the
// dynamic_bitset.
- std::vector<BatchMessageId> sendList_;
+ std::vector<MessageId> sendList_;
// we don't need to track MessageId < greatestCumulativeAckReceived
- BatchMessageId greatestCumulativeAckSent_;
+ MessageId greatestCumulativeAckSent_;
std::string name_;
public:
BatchAcknowledgementTracker(const std::string topic, const std::string subscription,
const long consumerId);
- bool isBatchReady(const BatchMessageId& msgID, const proto::CommandAck_AckType ackType);
- const BatchMessageId getGreatestCumulativeAckReady(const BatchMessageId& messageId);
+ bool isBatchReady(const MessageId& msgID, const proto::CommandAck_AckType ackType);
+ const MessageId getGreatestCumulativeAckReady(const MessageId& messageId);
- void deleteAckedMessage(const BatchMessageId& messageId, proto::CommandAck_AckType ackType);
+ void deleteAckedMessage(const MessageId& messageId, proto::CommandAck_AckType ackType);
void receivedMessage(const Message& message);
void clear();
@@ -72,23 +71,23 @@ class BatchAcknowledgementTracker {
// Used for Cumulative acks only
struct SendRemoveCriteria {
private:
- const BatchMessageId& messageId_;
+ const MessageId& messageId_;
public:
- SendRemoveCriteria(const BatchMessageId& messageId) : messageId_(messageId) {}
+ SendRemoveCriteria(const MessageId& messageId) : messageId_(messageId) {}
- bool operator()(const BatchMessageId& element) const { return (element <= messageId_); }
+ bool operator()(const MessageId& element) const { return (element <= messageId_); }
};
// Used for Cumulative acks only
struct TrackerMapRemoveCriteria {
private:
- const BatchMessageId& messageId_;
+ const MessageId& messageId_;
public:
- TrackerMapRemoveCriteria(const BatchMessageId& messageId) : messageId_(messageId) {}
+ TrackerMapRemoveCriteria(const MessageId& messageId) : messageId_(messageId) {}
- bool operator()(std::pair<const pulsar::BatchMessageId, boost::dynamic_bitset<> >& element) const {
+ bool operator()(std::pair<const MessageId, boost::dynamic_bitset<> >& element) const {
return (element.first <= messageId_);
}
};
diff --git a/pulsar-client-cpp/lib/BatchMessageId.cc b/pulsar-client-cpp/lib/BatchMessageId.cc
deleted file mode 100644
index 27d7bbd..0000000
--- a/pulsar-client-cpp/lib/BatchMessageId.cc
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <pulsar/BatchMessageId.h>
-
-#include "PulsarApi.pb.h"
-
-#include <tuple>
-#include <iostream>
-
-namespace pulsar {
-
-BatchMessageId::BatchMessageId(const MessageId& msgId)
- : MessageId(msgId.ledgerId_, msgId.entryId_), batchIndex_(msgId.getBatchIndex()) {}
-
-void BatchMessageId::serialize(std::string& result) const {
- proto::MessageIdData idData;
- idData.set_ledgerid(ledgerId_);
- idData.set_entryid(entryId_);
- idData.set_batch_index(batchIndex_);
-
- if (partition_ != -1) {
- idData.set_partition(partition_);
- }
-
- idData.SerializeToString(&result);
-}
-
-int64_t BatchMessageId::getBatchIndex() const { return batchIndex_; }
-
-#pragma GCC visibility push(default)
-
-bool BatchMessageId::operator<(const BatchMessageId& other) const {
- if (ledgerId_ < other.ledgerId_) {
- return true;
- } else if (ledgerId_ > other.ledgerId_) {
- return false;
- }
-
- if (entryId_ < other.entryId_) {
- return true;
- } else {
- return false;
- }
-}
-
-bool BatchMessageId::operator<=(const BatchMessageId& other) const { return *this < other || *this == other; }
-
-bool BatchMessageId::operator==(const BatchMessageId& other) const {
- return ledgerId_ == other.ledgerId_ && entryId_ == other.entryId_ && batchIndex_ == other.batchIndex_;
-}
-
-std::ostream& operator<<(std::ostream& s, const BatchMessageId& messageId) {
- s << '(' << messageId.ledgerId_ << ':' << messageId.entryId_ << ':' << messageId.batchIndex_ << ':'
- << messageId.partition_ << ')';
- return s;
-}
-
-#pragma GCC visibility pop
-} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 68d366a..1390336 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -161,14 +161,14 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st
}
}
- BatchMessageId msgId(startMessageId);
+ MessageId msgId(startMessageId);
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
boost::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), _1, _2, topicName, msgId,
conf, callback));
}
void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDataResultPtr partitionMetadata,
- TopicNamePtr topicName, BatchMessageId startMessageId,
+ TopicNamePtr topicName, MessageId startMessageId,
ReaderConfiguration conf, ReaderCallback callback) {
if (result != ResultOk) {
LOG_ERROR("Error Checking/Getting Partition Metadata while creating reader: " << result);
diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h
index 105df8a..5283b58 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -91,7 +91,7 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> {
SubscribeCallback callback);
void handleReaderMetadataLookup(const Result result, const LookupDataResultPtr partitionMetadata,
- TopicNamePtr topicName, BatchMessageId startMessageId,
+ TopicNamePtr topicName, MessageId startMessageId,
ReaderConfiguration conf, ReaderCallback callback);
void handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerWeakPtr,
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index e0245c5..82e7abb 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -185,7 +185,7 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
- Optional<BatchMessageId> startMessageId) {
+ Optional<MessageId> startMessageId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SUBSCRIBE);
CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -198,11 +198,11 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
if (startMessageId.is_present()) {
MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
- messageIdData.set_ledgerid(startMessageId.value().ledgerId_);
- messageIdData.set_entryid(startMessageId.value().entryId_);
+ messageIdData.set_ledgerid(startMessageId.value().ledgerId());
+ messageIdData.set_entryid(startMessageId.value().entryId());
- if (startMessageId.value().batchIndex_ != -1) {
- messageIdData.set_batch_index(startMessageId.value().batchIndex_);
+ if (startMessageId.value().batchIndex() != -1) {
+ messageIdData.set_batch_index(startMessageId.value().batchIndex());
}
}
@@ -442,7 +442,7 @@ void Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, Shar
batchPayLoad.write(msg.impl_->payload.data(), payloadSize);
}
-Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage) {
+Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex) {
SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload;
// Format of batch message
@@ -459,7 +459,9 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage) {
SharedBuffer payload = uncompressedPayload.slice(0, payloadSize);
uncompressedPayload.consume(payloadSize);
- Message singleMessage(batchedMessage.impl_->messageId, batchedMessage.impl_->metadata, payload, metadata);
+ const MessageId& m = batchedMessage.impl_->messageId;
+ MessageId singleMessageId(m.partition(), m.ledgerId(), m.entryId(), batchIndex);
+ Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata);
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
return singleMessage;
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index bda48d2..3746dc5 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -77,8 +77,7 @@ class Commands {
static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId,
proto::CommandSubscribe_SubType subType, const std::string& consumerName,
- SubscriptionMode subscriptionMode,
- Optional<BatchMessageId> startMessageId);
+ SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId);
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
@@ -106,7 +105,7 @@ class Commands {
static void serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad,
const unsigned long& maxMessageSizeInBytes);
- static Message deSerializeSingleMessageInBatch(Message& batchedMessage);
+ static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex);
static SharedBuffer newConsumerStats(uint64_t consumerId, uint64_t requestId);
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 55e4e6c..2abbdd0 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -36,8 +36,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
const std::string& subscription, const ConsumerConfiguration& conf,
const ExecutorServicePtr listenerExecutor /* = NULL by default */,
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
- Commands::SubscriptionMode subscriptionMode,
- Optional<BatchMessageId> startMessageId)
+ Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
: HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))),
waitingForZeroQueueSizeMessage(false),
config_(conf),
@@ -122,7 +121,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
return;
}
- Optional<BatchMessageId> firstMessageInQueue = clearReceiveQueue();
+ Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
unAckedMessageTrackerPtr_->clear();
batchAcknowledgementTracker_.clear();
@@ -271,8 +270,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
return;
}
- Message m(msg, metadata, payload);
- m.impl_->messageId.partition_ = partitionIndex_;
+ Message m(msg, metadata, payload, partitionIndex_);
m.impl_->cnx_ = cnx.get();
LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
@@ -320,18 +318,17 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
int skippedMessages = 0;
for (int i = 0; i < batchSize; i++) {
- batchedMessage.impl_->messageId.batchIndex_ = i;
// This is a cheap copy since message contains only one shared pointer (impl_)
- Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage);
+ Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
if (startMessageId_.is_present()) {
- const BatchMessageId& msgId = static_cast<const BatchMessageId&>(msg.getMessageId());
+ const MessageId& msgId = msg.getMessageId();
// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
- if (msgId.ledgerId_ == startMessageId_.value().ledgerId_ &&
- msgId.entryId_ == startMessageId_.value().entryId_ &&
- msgId.batchIndex_ <= startMessageId_.value().batchIndex_) {
+ if (msgId.ledgerId() == startMessageId_.value().ledgerId() &&
+ msgId.entryId() == startMessageId_.value().entryId() &&
+ msgId.batchIndex() <= startMessageId_.value().batchIndex()) {
LOG_DEBUG(getName() << "Ignoring message from before the startMessageId"
<< msg.getMessageId());
++skippedMessages;
@@ -559,7 +556,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
void ConsumerImpl::messageProcessed(Message& msg) {
Lock lock(mutex_);
- lastDequedMessage_ = Optional<BatchMessageId>::of(static_cast<const BatchMessageId&>(msg.getMessageId()));
+ lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
ClientConnectionPtr currentCnx = getCnx().lock();
if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
@@ -575,23 +572,19 @@ void ConsumerImpl::messageProcessed(Message& msg) {
* was
* not seen by the application
*/
-Optional<BatchMessageId> ConsumerImpl::clearReceiveQueue() {
+Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
Message nextMessageInQueue;
if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
// There was at least one message pending in the queue
- // We can safely cast to 'BatchMessageId' since all the messages queued will have that type of message
- // id,
- // irrespective of whether they were part of a batch or not.
- const BatchMessageId& nextMessageId =
- static_cast<const BatchMessageId&>(nextMessageInQueue.getMessageId());
- BatchMessageId previousMessageId;
- if (nextMessageId.batchIndex_ >= 0) {
- previousMessageId = BatchMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_,
- nextMessageId.batchIndex_ - 1);
+ const MessageId& nextMessageId = nextMessageInQueue.getMessageId();
+ MessageId previousMessageId;
+ if (nextMessageId.batchIndex() >= 0) {
+ previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId(),
+ nextMessageId.batchIndex() - 1);
} else {
- previousMessageId = BatchMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_ - 1, -1);
+ previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId() - 1, -1);
}
- return Optional<BatchMessageId>::of(previousMessageId);
+ return Optional<MessageId>::of(previousMessageId);
} else if (lastDequedMessage_.is_present()) {
// If the queue was empty we need to restart from the message just after the last one that has been
// dequeued
@@ -646,23 +639,21 @@ void ConsumerImpl::statsCallback(Result res, ResultCallback callback, proto::Com
void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
ResultCallback cb =
boost::bind(&ConsumerImpl::statsCallback, this, _1, callback, proto::CommandAck_AckType_Individual);
- const BatchMessageId& batchMsgId = (const BatchMessageId&)msgId;
- if (batchMsgId.batchIndex_ != -1 &&
- !batchAcknowledgementTracker_.isBatchReady(batchMsgId, proto::CommandAck_AckType_Individual)) {
+ if (msgId.batchIndex() != -1 &&
+ !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Individual)) {
cb(ResultOk);
return;
}
- doAcknowledge(batchMsgId, proto::CommandAck_AckType_Individual, cb);
+ doAcknowledge(msgId, proto::CommandAck_AckType_Individual, cb);
}
-void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& mId, ResultCallback callback) {
+void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
ResultCallback cb =
boost::bind(&ConsumerImpl::statsCallback, this, _1, callback, proto::CommandAck_AckType_Cumulative);
- const BatchMessageId& msgId = (const BatchMessageId&)mId;
- if (msgId.batchIndex_ != -1 &&
+ if (msgId.batchIndex() != -1 &&
!batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Cumulative)) {
- BatchMessageId messageId = batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId);
- if (messageId == BatchMessageId()) {
+ MessageId messageId = batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId);
+ if (messageId == MessageId()) {
// nothing to ack
cb(ResultOk);
} else {
@@ -673,11 +664,11 @@ void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& mId, ResultCallba
}
}
-void ConsumerImpl::doAcknowledge(const BatchMessageId& messageId, proto::CommandAck_AckType ackType,
+void ConsumerImpl::doAcknowledge(const MessageId& messageId, proto::CommandAck_AckType ackType,
ResultCallback callback) {
proto::MessageIdData messageIdData;
- messageIdData.set_ledgerid(messageId.ledgerId_);
- messageIdData.set_entryid(messageId.entryId_);
+ messageIdData.set_ledgerid(messageId.ledgerId());
+ messageIdData.set_entryid(messageId.entryId());
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
SharedBuffer cmd = Commands::newAck(consumerId_, messageIdData, ackType, -1);
@@ -687,7 +678,7 @@ void ConsumerImpl::doAcknowledge(const BatchMessageId& messageId, proto::Command
} else {
unAckedMessageTrackerPtr_->removeMessagesTill(messageId);
}
- batchAcknowledgementTracker_.deleteAckedMessage((BatchMessageId&)messageId, ackType);
+ batchAcknowledgementTracker_.deleteAckedMessage(messageId, ackType);
callback(ResultOk);
LOG_DEBUG(getName() << "ack request sent for message - [" << messageIdData.ledgerid() << ","
<< messageIdData.entryid() << "]");
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index d115ed8..45a12ef 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -20,7 +20,7 @@
#define LIB_CONSUMERIMPL_H_
#include <string>
-#include "pulsar/BatchMessageId.h"
+
#include "pulsar/Result.h"
#include "UnboundedBlockingQueue.h"
#include "HandlerBase.h"
@@ -69,7 +69,7 @@ class ConsumerImpl : public ConsumerImplBase,
const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(),
const ConsumerTopicType consumerTopicType = NonPartitioned,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
- Optional<BatchMessageId> startMessageId = Optional<BatchMessageId>::empty());
+ Optional<MessageId> startMessageId = Optional<MessageId>::empty());
~ConsumerImpl();
void setPartitionIndex(int partitionIndex);
int getPartitionIndex();
@@ -82,7 +82,7 @@ class ConsumerImpl : public ConsumerImplBase,
inline proto::CommandSubscribe_SubType getSubType();
void unsubscribeAsync(ResultCallback callback);
void handleUnsubscribe(Result result, ResultCallback callback);
- void doAcknowledge(const BatchMessageId& messageId, proto::CommandAck_AckType ackType,
+ void doAcknowledge(const MessageId& messageId, proto::CommandAck_AckType ackType,
ResultCallback callback);
virtual void disconnectConsumer();
virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture();
@@ -134,7 +134,7 @@ class ConsumerImpl : public ConsumerImplBase,
Result receiveHelper(Message& msg, int timeout);
void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
- Optional<BatchMessageId> clearReceiveQueue();
+ Optional<MessageId> clearReceiveQueue();
boost::mutex mutexForReceiveWithZeroQueueSize;
const ConsumerConfiguration config_;
@@ -145,15 +145,15 @@ class ConsumerImpl : public ConsumerImplBase,
ConsumerTopicType consumerTopicType_;
Commands::SubscriptionMode subscriptionMode_;
- Optional<BatchMessageId> startMessageId_;
+ Optional<MessageId> startMessageId_;
- Optional<BatchMessageId> lastDequedMessage_;
+ Optional<MessageId> lastDequedMessage_;
UnboundedBlockingQueue<Message> incomingMessages_;
int availablePermits_;
uint64_t consumerId_;
std::string consumerName_;
std::string consumerStr_;
- short partitionIndex_;
+ int32_t partitionIndex_;
Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
bool messageListenerRunning_;
boost::mutex messageListenerMutex_;
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 79d7ca4..6b46fd0 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -34,7 +34,7 @@ using namespace pulsar;
namespace pulsar {
const static std::string emptyString;
-const static BatchMessageId invalidMessageId;
+const static MessageId invalidMessageId;
const Message::StringMap& Message::getProperties() const { return impl_->properties(); }
@@ -62,14 +62,17 @@ Message::Message() : impl_() {}
Message::Message(MessageImplPtr& impl) : impl_(impl) {}
-Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload)
+Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload,
+ int32_t partition)
: impl_(boost::make_shared<MessageImpl>()) {
- impl_->messageId = BatchMessageId(msg.message_id().ledgerid(), msg.message_id().entryid());
+ impl_->messageId =
+ MessageId(partition, msg.message_id().ledgerid(), msg.message_id().entryid(), /* batchId */
+ -1);
impl_->metadata = metadata;
impl_->payload = payload;
}
-Message::Message(const BatchMessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
+Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
proto::SingleMessageMetadata& singleMetadata)
: impl_(boost::make_shared<MessageImpl>()) {
impl_->messageId = messageID;
diff --git a/pulsar-client-cpp/lib/MessageId.cc b/pulsar-client-cpp/lib/MessageId.cc
index dc1ff38..c5314d8 100644
--- a/pulsar-client-cpp/lib/MessageId.cc
+++ b/pulsar-client-cpp/lib/MessageId.cc
@@ -18,9 +18,9 @@
*/
#include <pulsar/MessageId.h>
-#include <pulsar/BatchMessageId.h>
#include "PulsarApi.pb.h"
+#include "MessageIdImpl.h"
#include <iostream>
#include <limits>
@@ -30,43 +30,40 @@
namespace pulsar {
-MessageId::MessageId() : ledgerId_(-1), entryId_(-1), partition_(-1) {}
+MessageId::MessageId() {
+ static const MessageIdImplPtr emptyMessageId = boost::make_shared<MessageIdImpl>();
+ impl_ = emptyMessageId;
+}
MessageId& MessageId::operator=(const MessageId& m) {
- entryId_ = m.entryId_;
- ledgerId_ = m.ledgerId_;
- partition_ = m.partition_;
+ impl_ = m.impl_;
return *this;
}
-MessageId::MessageId(int64_t ledgerId, int64_t entryId)
- : ledgerId_(ledgerId), entryId_(entryId), partition_(-1) {
- // partition is set explicitly in consumerImpl when message is received
- // consumer's partition is assigned to this partition
-}
-
-int64_t MessageId::getBatchIndex() const {
- // It's only relevant for batch message ids
- return -1;
-}
+MessageId::MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex)
+ : impl_(boost::make_shared<MessageIdImpl>(partition, ledgerId, entryId, batchIndex)) {}
const MessageId& MessageId::earliest() {
- static const BatchMessageId _earliest(-1, -1);
+ static const MessageId _earliest(-1, -1, -1, -1);
return _earliest;
}
const MessageId& MessageId::latest() {
- // For entry-id we only have 48bits
- static const BatchMessageId _latest(std::numeric_limits<int64_t>::max(), (int64_t)(pow(2, 47) - 1));
+ static const int64_t long_max = std::numeric_limits<int64_t>::max();
+ static const MessageId _latest(-1, long_max, long_max, -1);
return _latest;
}
void MessageId::serialize(std::string& result) const {
proto::MessageIdData idData;
- idData.set_ledgerid(ledgerId_);
- idData.set_entryid(entryId_);
- if (partition_ != -1) {
- idData.set_partition(partition_);
+ idData.set_ledgerid(impl_->ledgerId_);
+ idData.set_entryid(impl_->entryId_);
+ if (impl_->partition_ != -1) {
+ idData.set_partition(impl_->partition_);
+ }
+
+ if (impl_->batchIndex_ != -1) {
+ idData.set_batch_index(impl_->batchIndex_);
}
idData.SerializeToString(&result);
@@ -75,38 +72,63 @@ void MessageId::serialize(std::string& result) const {
/**
* Deserialize a message id from a binary string
*/
-boost::shared_ptr<MessageId> MessageId::deserialize(const std::string& serializedMessageId) {
+MessageId MessageId::deserialize(const std::string& serializedMessageId) {
proto::MessageIdData idData;
if (!idData.ParseFromString(serializedMessageId)) {
throw "Failed to parse serialized message id";
}
- return boost::make_shared<BatchMessageId>(idData.ledgerid(), idData.entryid(), idData.batch_index());
+ return MessageId(idData.partition(), idData.ledgerid(), idData.entryid(), idData.batch_index());
}
+int64_t MessageId::ledgerId() const { return impl_->ledgerId_; }
+
+int64_t MessageId::entryId() const { return impl_->entryId_; }
+
+int32_t MessageId::batchIndex() const { return impl_->batchIndex_; }
+
+int32_t MessageId::partition() const { return impl_->partition_; }
+
#pragma GCC visibility push(default)
+
std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
- s << '(' << messageId.ledgerId_ << ',' << messageId.entryId_ << ',' << messageId.partition_ << ')';
+ s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
+ << messageId.impl_->batchIndex_ << ',' << messageId.impl_->partition_ << ')';
return s;
}
bool MessageId::operator<(const MessageId& other) const {
- if (ledgerId_ < other.ledgerId_) {
+ if (impl_->ledgerId_ < other.impl_->ledgerId_) {
+ return true;
+ } else if (impl_->ledgerId_ > other.impl_->ledgerId_) {
+ return false;
+ }
+
+ if (impl_->entryId_ < other.impl_->entryId_) {
return true;
- } else if (ledgerId_ > other.ledgerId_) {
+ } else if (impl_->entryId_ > other.impl_->entryId_) {
return false;
}
- if (entryId_ < other.entryId_) {
+ if (impl_->batchIndex_ < other.impl_->batchIndex_) {
return true;
} else {
return false;
}
}
+bool MessageId::operator<=(const MessageId& other) const { return *this < other || *this == other; }
+
+bool MessageId::operator>(const MessageId& other) const { return !(*this <= other); }
+
+bool MessageId::operator>=(const MessageId& other) const { return !(*this < other); }
+
bool MessageId::operator==(const MessageId& other) const {
- return ledgerId_ == other.ledgerId_ && entryId_ == other.entryId_;
+ return impl_->ledgerId_ == other.impl_->ledgerId_ && impl_->entryId_ == other.impl_->entryId_ &&
+ impl_->batchIndex_ == other.impl_->batchIndex_ && impl_->partition_ == other.impl_->partition_;
}
+bool MessageId::operator!=(const MessageId& other) const { return !(*this == other); }
+
#pragma GCC visibility pop
} // namespace pulsar
diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc b/pulsar-client-cpp/lib/MessageIdImpl.h
similarity index 61%
copy from pulsar-client-cpp/tests/MessageIdTest.cc
copy to pulsar-client-cpp/lib/MessageIdImpl.h
index 7402143..a3fc171 100644
--- a/pulsar-client-cpp/tests/MessageIdTest.cc
+++ b/pulsar-client-cpp/lib/MessageIdImpl.h
@@ -16,21 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-#include <pulsar/BatchMessageId.h>
-#include <gtest/gtest.h>
+#pragma once
-#include <string>
+#include <cstdint>
-using namespace pulsar;
+namespace pulsar {
-TEST(MessageIdTest, testSerialization) {
- BatchMessageId msgId(1, 2, 3);
-
- std::string serialized;
- msgId.serialize(serialized);
-
- boost::shared_ptr<MessageId> deserialized = MessageId::deserialize(serialized);
-
- ASSERT_EQ(msgId, static_cast<const BatchMessageId&>(*deserialized));
-}
+class MessageIdImpl {
+ public:
+ MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), batchIndex_(-1) {}
+ MessageIdImpl(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex)
+ : ledgerId_(ledgerId), entryId_(entryId), partition_(partition), batchIndex_(batchIndex) {}
+ const int64_t ledgerId_;
+ const int64_t entryId_;
+ const int32_t partition_;
+ const int32_t batchIndex_;
+};
+} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h
index 23a8fcf..f3753b1 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -21,7 +21,6 @@
#include <pulsar/Message.h>
#include <pulsar/MessageId.h>
-#include "pulsar/BatchMessageId.h"
#include "SharedBuffer.h"
#include "PulsarApi.pb.h"
@@ -42,7 +41,7 @@ class MessageImpl {
proto::MessageMetadata metadata;
SharedBuffer payload;
- BatchMessageId messageId;
+ MessageId messageId;
ClientConnection* cnx_;
const std::string& getPartitionKey() const;
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index f9c02b3..dc218ef 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -150,7 +150,7 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int
}
void PartitionedConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
- int partition = msgId.partition_;
+ int32_t partition = msgId.partition();
assert(partition < numPartitions_ && partition >= 0 && consumers_.size() > partition);
unAckedMessageTrackerPtr_->remove(msgId);
consumers_[partition]->acknowledgeAsync(msgId, callback);
@@ -313,7 +313,7 @@ bool PartitionedConsumerImpl::isOpen() {
}
void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
- LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition_);
+ LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition());
messages_.push(msg);
if (messageListener_) {
listenerExecutor_->postWork(
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 728573a..6808aad 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -130,7 +130,6 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
}
// find a producer for that partition, index should start from 0
ProducerImplPtr& producer = producers_[partition];
- msg.impl_->messageId.partition_ = partition;
// send message on that partition
producer->sendAsync(msg, callback);
}
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 308908d..5e69a72 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -28,7 +28,7 @@ ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, con
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback)
: topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {}
-void ReaderImpl::start(const BatchMessageId& startMessageId) {
+void ReaderImpl::start(const MessageId& startMessageId) {
ConsumerConfiguration consumerConf;
consumerConf.setConsumerType(ConsumerExclusive);
consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
@@ -51,7 +51,7 @@ void ReaderImpl::start(const BatchMessageId& startMessageId) {
consumer_ = boost::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), NonPartitioned,
- Commands::SubscriptionModeNonDurable, Optional<BatchMessageId>::of(startMessageId));
+ Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
consumer_->getConsumerCreatedFuture().addListener(
boost::bind(&ReaderImpl::handleConsumerCreated, shared_from_this(), _1, _2));
consumer_->start();
@@ -87,13 +87,11 @@ void ReaderImpl::acknowledgeIfNecessary(Result result, const Message& msg) {
return;
}
- const BatchMessageId& msgId = static_cast<const BatchMessageId&>(msg.getMessageId());
-
// Only acknowledge on the first message in the batch
- if (msgId.batchIndex_ <= 0) {
+ if (msg.getMessageId().batchIndex() <= 0) {
// Acknowledge message immediately because the reader is based on non-durable
// subscription. When it reconnects, it will specify the subscription position anyway
- consumer_->acknowledgeCumulativeAsync(msgId, emptyCallback);
+ consumer_->acknowledgeCumulativeAsync(msg.getMessageId(), emptyCallback);
}
}
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index 97c7f6b..61216b8 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -36,7 +36,7 @@ class ReaderImpl : public boost::enable_shared_from_this<ReaderImpl> {
ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
- void start(const BatchMessageId& startMessageId);
+ void start(const MessageId& startMessageId);
const std::string& getTopic() const;
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 5930606..90006b6 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -85,14 +85,14 @@ long UnAckedMessageTrackerEnabled::size() {
void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
boost::unique_lock<boost::mutex> acquire(lock_);
for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
- if (*it < msgId && it->partition_ == msgId.partition_) {
+ if (*it < msgId && it->partition() == msgId.partition()) {
oldSet_.erase(it++);
} else {
it++;
}
}
for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) {
- if (*it < msgId && it->partition_ == msgId.partition_) {
+ if (*it < msgId && it->partition() == msgId.partition()) {
currentSet_.erase(it++);
} else {
it++;
diff --git a/pulsar-client-cpp/python/src/client.cc b/pulsar-client-cpp/python/src/client.cc
index ad78ed2..b5295ac 100644
--- a/pulsar-client-cpp/python/src/client.cc
+++ b/pulsar-client-cpp/python/src/client.cc
@@ -44,7 +44,7 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s
}
Reader Client_createReader(Client& client, const std::string& topic,
- const BatchMessageId& startMessageId,
+ const MessageId& startMessageId,
const ReaderConfiguration& conf) {
Reader reader;
Result res;
diff --git a/pulsar-client-cpp/python/src/message.cc b/pulsar-client-cpp/python/src/message.cc
index b7513d3..1d1ee23 100644
--- a/pulsar-client-cpp/python/src/message.cc
+++ b/pulsar-client-cpp/python/src/message.cc
@@ -20,13 +20,13 @@
#include <boost/python/suite/indexing/map_indexing_suite.hpp>
-std::string MessageId_str(const BatchMessageId& msgId) {
+std::string MessageId_str(const MessageId& msgId) {
std::stringstream ss;
ss << msgId;
return ss.str();
}
-std::string MessageId_serialize(const BatchMessageId& msgId) {
+std::string MessageId_serialize(const MessageId& msgId) {
std::string serialized;
msgId.serialize(serialized);
return serialized;
@@ -42,8 +42,8 @@ boost::python::object Message_data(const Message& msg) {
return boost::python::object(boost::python::handle<>(PyBytes_FromStringAndSize((const char*)msg.getData(), msg.getLength())));
}
-const BatchMessageId& Message_getMessageId(const Message& msg) {
- return static_cast<const BatchMessageId&>(msg.getMessageId());
+const MessageId& Message_getMessageId(const Message& msg) {
+ return msg.getMessageId();
}
void export_message() {
@@ -67,10 +67,10 @@ void export_message() {
.def(map_indexing_suite<Message::StringMap>())
;
- static const BatchMessageId& _MessageId_earliest = static_cast<const BatchMessageId&>(MessageId::earliest());
- static const BatchMessageId& _MessageId_latest = static_cast<const BatchMessageId&>(MessageId::latest());
+ static const MessageId& _MessageId_earliest = MessageId::earliest();
+ static const MessageId& _MessageId_latest = MessageId::latest();
- class_<BatchMessageId, boost::shared_ptr<BatchMessageId> >("MessageId")
+ class_<MessageId>("MessageId")
.def("__str__", &MessageId_str)
.add_static_property("earliest", make_getter(&_MessageId_earliest))
.add_static_property("latest", make_getter(&_MessageId_latest))
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc
index dda8f79..34c38db 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -228,7 +228,7 @@ TEST(BatchMessageTest, testBatchSizeInBytes) {
std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
- ASSERT_LT(pulsar::PulsarFriend::getBatchIndex((BatchMessageId&)receivedMsg.getMessageId()), 2);
+ ASSERT_LT(pulsar::PulsarFriend::getBatchIndex(receivedMsg.getMessageId()), 2);
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc b/pulsar-client-cpp/tests/MessageIdTest.cc
index 7402143..06c2528 100644
--- a/pulsar-client-cpp/tests/MessageIdTest.cc
+++ b/pulsar-client-cpp/tests/MessageIdTest.cc
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-#include <pulsar/BatchMessageId.h>
+#include <pulsar/MessageId.h>
+#include "PulsarFriend.h"
#include <gtest/gtest.h>
@@ -25,12 +26,12 @@
using namespace pulsar;
TEST(MessageIdTest, testSerialization) {
- BatchMessageId msgId(1, 2, 3);
+ MessageId msgId = PulsarFriend::getMessageId(-1, 1, 2, 3);
std::string serialized;
msgId.serialize(serialized);
- boost::shared_ptr<MessageId> deserialized = MessageId::deserialize(serialized);
+ MessageId deserialized = MessageId::deserialize(serialized);
- ASSERT_EQ(msgId, static_cast<const BatchMessageId&>(*deserialized));
+ ASSERT_EQ(msgId, deserialized);
}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index c64bab2..f2d1a69 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-#include <pulsar/BatchMessageId.h>
+
#include <lib/ProducerImpl.h>
#include <lib/ConsumerImpl.h>
#include <string>
@@ -26,7 +26,11 @@ using std::string;
namespace pulsar {
class PulsarFriend {
public:
- static int getBatchIndex(const BatchMessageId& mId) { return mId.batchIndex_; }
+ static MessageId getMessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex) {
+ return MessageId(partition, ledgerId, entryId, batchIndex);
+ }
+
+ static int getBatchIndex(const MessageId& mId) { return mId.batchIndex(); }
static ProducerStatsImplPtr getProducerStatsPtr(Producer producer) {
ProducerImpl* producerImpl = static_cast<ProducerImpl*>(producer.impl_.get());
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index 0e31f6a..be9678d 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -257,7 +257,7 @@ TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) {
// Create another reader starting on msgid4
auto msgId4 = MessageId::deserialize(lastMessageId);
Reader reader2;
- ASSERT_EQ(ResultOk, client.createReader(topicName, *msgId4, readerConf, reader2));
+ ASSERT_EQ(ResultOk, client.createReader(topicName, msgId4, readerConf, reader2));
for (int i = 5; i < 11; i++) {
Message msg;
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.