You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/12/07 03:13:40 UTC
[pulsar-client-cpp] branch main updated: Use boost::optional instead self-written Optional class (#138)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 18dcdd5 Use boost::optional instead self-written Optional class (#138)
18dcdd5 is described below
commit 18dcdd55afa4694933d293cda4035365dcb720a4
Author: A <fr...@ngs.ru>
AuthorDate: Wed Dec 7 10:13:35 2022 +0700
Use boost::optional instead self-written Optional class (#138)
Fixes #134
### Motivation
Switch to use boost::optional instead self-written Optional class
### Modifications
Remove class Optional in Utils.h and change all usage to boost::optional
---
lib/ClientConnection.cc | 5 +++--
lib/ClientConnection.h | 4 ++--
lib/Commands.cc | 8 ++++----
lib/Commands.h | 22 ++++++++++------------
lib/ConsumerImpl.cc | 40 ++++++++++++++++++++--------------------
lib/ConsumerImpl.h | 17 +++++++++--------
lib/MultiTopicsConsumerImpl.cc | 15 +++++++--------
lib/ProducerConfiguration.cc | 8 ++++----
lib/ProducerConfigurationImpl.h | 7 +++----
lib/ProducerImpl.h | 4 ++--
lib/ReaderImpl.cc | 7 +++----
lib/SynchronizedHashMap.h | 17 ++++++++---------
lib/Utils.h | 32 --------------------------------
tests/SynchronizedHashMapTest.cc | 18 +++++++++---------
14 files changed, 84 insertions(+), 120 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 5c14467..51a09f4 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -20,6 +20,7 @@
#include <pulsar/MessageIdBuilder.h>
+#include <boost/optional.hpp>
#include <fstream>
#include "Commands.h"
@@ -1093,9 +1094,9 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
- data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
+ data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch());
} else {
- data.topicEpoch = Optional<uint64_t>::empty();
+ data.topicEpoch = boost::none;
}
requestData.promise.setValue(data);
requestData.timer->cancel();
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index a07e2cd..62a5e9b 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -29,6 +29,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
+#include <boost/optional.hpp>
#include <deque>
#include <functional>
#include <memory>
@@ -40,7 +41,6 @@
#include "LookupDataResult.h"
#include "SharedBuffer.h"
#include "UtilAllocator.h"
-#include "Utils.h"
namespace pulsar {
@@ -83,7 +83,7 @@ struct ResponseData {
std::string producerName;
int64_t lastSequenceId;
std::string schemaVersion;
- Optional<uint64_t> topicEpoch;
+ boost::optional<uint64_t> topicEpoch;
};
typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
diff --git a/lib/Commands.cc b/lib/Commands.cc
index f97b0eb..5bb9587 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -296,7 +296,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication,
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
- Optional<MessageId> startMessageId, bool readCompacted,
+ boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
@@ -323,7 +323,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscribe->set_allocated_schema(getSchema(schemaInfo));
}
- if (startMessageId.is_present()) {
+ if (startMessageId) {
MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
messageIdData.set_ledgerid(startMessageId.value().ledgerId());
messageIdData.set_entryid(startMessageId.value().entryId());
@@ -383,7 +383,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
- ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch) {
+ ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
@@ -394,7 +394,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
producer->set_user_provided_producer_name(userProvidedProducerName);
producer->set_encrypted(encrypted);
producer->set_producer_access_mode(static_cast<proto::ProducerAccessMode>(accessMode));
- if (topicEpoch.is_present()) {
+ if (topicEpoch) {
producer->set_topic_epoch(topicEpoch.value());
}
diff --git a/lib/Commands.h b/lib/Commands.h
index 6681f13..bbe96fd 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -25,11 +25,11 @@
#include <pulsar/Schema.h>
#include <pulsar/defines.h>
+#include <boost/optional.hpp>
#include <set>
#include "ProtoApiEnums.h"
#include "SharedBuffer.h"
-#include "Utils.h"
using namespace pulsar;
@@ -89,16 +89,14 @@ class Commands {
uint64_t sequenceId, ChecksumType checksumType,
const proto::MessageMetadata& metadata, const SharedBuffer& payload);
- static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
- uint64_t consumerId, uint64_t requestId,
- CommandSubscribe_SubType subType, const std::string& consumerName,
- SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
- bool readCompacted, const std::map<std::string, std::string>& metadata,
- const std::map<std::string, std::string>& subscriptionProperties,
- const SchemaInfo& schemaInfo,
- CommandSubscribe_InitialPosition subscriptionInitialPosition,
- bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
- int priorityLevel = 0);
+ static SharedBuffer newSubscribe(
+ const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId,
+ CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode,
+ boost::optional<MessageId> startMessageId, bool readCompacted,
+ const std::map<std::string, std::string>& metadata,
+ const std::map<std::string, std::string>& subscriptionProperties, const SchemaInfo& schemaInfo,
+ CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState,
+ KeySharedPolicy keySharedPolicy, int priorityLevel = 0);
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
@@ -107,7 +105,7 @@ class Commands {
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
- ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch);
+ ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch);
static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
CommandAck_AckType ackType, CommandAck_ValidationError validationError);
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 41b13ae..cf01e36 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -54,7 +54,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
const ExecutorServicePtr listenerExecutor /* = NULL by default */,
bool hasParent /* = false by default */,
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
- Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
+ Commands::SubscriptionMode subscriptionMode,
+ boost::optional<MessageId> startMessageId)
: ConsumerImplBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()),
waitingForZeroQueueSizeMessage(false),
@@ -191,9 +192,8 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lockForMessageId(mutexForMessageId_);
// Update startMessageId so that we can discard messages after delivery restarts
const auto startMessageId = clearReceiveQueue();
- const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
- ? startMessageId
- : Optional<MessageId>::empty();
+ const auto subscribeMessageId =
+ (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId : boost::none;
startMessageId_ = startMessageId;
lockForMessageId.unlock();
@@ -373,11 +373,11 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
});
}
-Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
- const proto::MessageMetadata& metadata,
- const MessageId& messageId,
- const proto::MessageIdData& messageIdData,
- const ClientConnectionPtr& cnx) {
+boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
+ const proto::MessageMetadata& metadata,
+ const MessageId& messageId,
+ const proto::MessageIdData& messageIdData,
+ const ClientConnectionPtr& cnx) {
const auto chunkId = metadata.chunk_id();
const auto uuid = metadata.uuid();
LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
@@ -422,14 +422,14 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
lock.unlock();
increaseAvailablePermits(cnx);
trackMessage(messageId);
- return Optional<SharedBuffer>::empty();
+ return boost::none;
}
chunkedMsgCtx.appendChunk(messageId, payload);
if (!chunkedMsgCtx.isCompleted()) {
lock.unlock();
increaseAvailablePermits(cnx);
- return Optional<SharedBuffer>::empty();
+ return boost::none;
}
LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
@@ -438,9 +438,9 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
auto wholePayload = chunkedMsgCtx.getBuffer();
chunkedMessageCache_.remove(uuid);
if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
- return Optional<SharedBuffer>::of(wholePayload);
+ return wholePayload;
} else {
- return Optional<SharedBuffer>::empty();
+ return boost::none;
}
}
@@ -477,7 +477,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
const auto& messageIdData = msg.message_id();
auto messageId = MessageIdBuilder::from(messageIdData).build();
auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
- if (optionalPayload.is_present()) {
+ if (optionalPayload) {
payload = optionalPayload.value();
} else {
return;
@@ -512,7 +512,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
m.impl_->convertPayloadToKeyValue(config_.getSchema());
const auto startMessageId = startMessageId_.get();
- if (isPersistent_ && startMessageId.is_present() &&
+ if (isPersistent_ && startMessageId &&
m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
m.getMessageId().entryId() == startMessageId.value().entryId() &&
isPriorEntryIndex(m.getMessageId().entryId())) {
@@ -637,7 +637,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
msg.impl_->setTopicName(batchedMessage.getTopicName());
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
- if (startMessageId.is_present()) {
+ if (startMessageId) {
const MessageId& msgId = msg.getMessageId();
// If we are receiving a batch message, we need to discard messages that were prior
@@ -921,10 +921,10 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
* was
* not seen by the application
*/
-Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
+boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
bool expectedDuringSeek = true;
if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
- return Optional<MessageId>::of(seekMessageId_.get());
+ return seekMessageId_.get();
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
return startMessageId_.get();
}
@@ -943,12 +943,12 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
.ledgerId(nextMessageId.ledgerId())
.entryId(nextMessageId.entryId() - 1)
.build();
- return Optional<MessageId>::of(previousMessageId);
+ return previousMessageId;
} else if (lastDequedMessageId_ != MessageId::earliest()) {
// If the queue was empty we need to restart from the message just after the last one that has been
// dequeued
// in the past
- return Optional<MessageId>::of(lastDequedMessageId_);
+ return lastDequedMessageId_;
} else {
// No message was received or dequeued by this consumer. Next message would still be the
// startMessageId
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index ce6c3f0..e9af3f6 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -21,6 +21,7 @@
#include <pulsar/Reader.h>
+#include <boost/optional.hpp>
#include <functional>
#include <memory>
@@ -71,7 +72,7 @@ class ConsumerImpl : public ConsumerImplBase {
const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(), bool hasParent = false,
const ConsumerTopicType consumerTopicType = NonPartitioned,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
- Optional<MessageId> startMessageId = Optional<MessageId>::empty());
+ boost::optional<MessageId> startMessageId = boost::none);
~ConsumerImpl();
void setPartitionIndex(int partitionIndex);
int getPartitionIndex();
@@ -193,7 +194,7 @@ class ConsumerImpl : public ConsumerImplBase {
const DeadlineTimerPtr& timer,
BrokerGetLastMessageIdCallback callback);
- Optional<MessageId> clearReceiveQueue();
+ boost::optional<MessageId> clearReceiveQueue();
void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
ResultCallback callback);
@@ -236,7 +237,7 @@ class ConsumerImpl : public ConsumerImplBase {
MessageId lastMessageIdInBroker_{MessageId::earliest()};
std::atomic_bool duringSeek_{false};
- Synchronized<Optional<MessageId>> startMessageId_{Optional<MessageId>::empty()};
+ Synchronized<boost::optional<MessageId>> startMessageId_;
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
class ChunkedMessageCtx {
@@ -321,11 +322,11 @@ class ConsumerImpl : public ConsumerImplBase {
* @return the concatenated payload if chunks are concatenated into a completed message payload
* successfully, else Optional::empty()
*/
- Optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
- const proto::MessageMetadata& metadata,
- const MessageId& messageId,
- const proto::MessageIdData& messageIdData,
- const ClientConnectionPtr& cnx);
+ boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
+ const proto::MessageMetadata& metadata,
+ const MessageId& messageId,
+ const proto::MessageIdData& messageIdData,
+ const ClientConnectionPtr& cnx);
friend class PulsarFriend;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index b8d55b4..d9d3873 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -373,7 +373,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
for (int i = 0; i < numberPartitions; i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
auto optConsumer = consumers_.find(topicPartitionName);
- if (optConsumer.is_empty()) {
+ if (!optConsumer) {
LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " << topicPartitionName);
callback(ResultUnknownError);
continue;
@@ -400,7 +400,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " << topicPartitionName);
auto optConsumer = consumers_.remove(topicPartitionName);
- if (optConsumer.is_present()) {
+ if (optConsumer) {
optConsumer.value()->pauseMessageListener();
}
@@ -638,7 +638,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
const std::string& topicPartitionName = msgId.getTopicName();
auto optConsumer = consumers_.find(topicPartitionName);
- if (optConsumer.is_present()) {
+ if (optConsumer) {
unAckedMessageTrackerPtr_->remove(msgId);
optConsumer.value()->acknowledgeAsync(msgId, callback);
} else {
@@ -674,7 +674,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
};
for (const auto& kv : topicToMessageId) {
auto optConsumer = consumers_.find(kv.first);
- if (optConsumer.is_present()) {
+ if (optConsumer) {
unAckedMessageTrackerPtr_->remove(kv.second);
optConsumer.value()->acknowledgeAsync(kv.second, cb);
} else {
@@ -691,7 +691,7 @@ void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
auto optConsumer = consumers_.find(msgId.getTopicName());
- if (optConsumer.is_present()) {
+ if (optConsumer) {
unAckedMessageTrackerPtr_->remove(msgId);
optConsumer.value()->negativeAcknowledge(msgId);
}
@@ -868,9 +868,8 @@ bool MultiTopicsConsumerImpl::isConnected() const {
return false;
}
- return consumers_
- .findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); })
- .is_empty();
+ return !consumers_.findFirstValueIf(
+ [](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); });
}
uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc
index 9b3fdfb..67e8b10 100644
--- a/lib/ProducerConfiguration.cc
+++ b/lib/ProducerConfiguration.cc
@@ -36,21 +36,21 @@ ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfigurat
}
ProducerConfiguration& ProducerConfiguration::setProducerName(const std::string& producerName) {
- impl_->producerName = Optional<std::string>::of(producerName);
+ impl_->producerName = boost::make_optional(producerName);
return *this;
}
const std::string& ProducerConfiguration::getProducerName() const {
- return impl_->producerName.is_present() ? impl_->producerName.value() : emptyString;
+ return !impl_->producerName ? emptyString : impl_->producerName.value();
}
ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t initialSequenceId) {
- impl_->initialSequenceId = Optional<int64_t>::of(initialSequenceId);
+ impl_->initialSequenceId = boost::make_optional(initialSequenceId);
return *this;
}
int64_t ProducerConfiguration::getInitialSequenceId() const {
- return impl_->initialSequenceId.is_present() ? impl_->initialSequenceId.value() : -1ll;
+ return !impl_->initialSequenceId ? -1ll : impl_->initialSequenceId.value();
}
ProducerConfiguration& ProducerConfiguration::setSendTimeout(int sendTimeoutMs) {
diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h
index 6c2b19d..83723f6 100644
--- a/lib/ProducerConfigurationImpl.h
+++ b/lib/ProducerConfigurationImpl.h
@@ -21,16 +21,15 @@
#include <pulsar/ProducerConfiguration.h>
+#include <boost/optional.hpp>
#include <memory>
-#include "Utils.h"
-
namespace pulsar {
struct ProducerConfigurationImpl {
SchemaInfo schemaInfo;
- Optional<std::string> producerName;
- Optional<int64_t> initialSequenceId;
+ boost::optional<std::string> producerName;
+ boost::optional<int64_t> initialSequenceId;
int sendTimeoutMs{30000};
CompressionType compressionType{CompressionNone};
int maxPendingMessages{1000};
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index cbf99ec..928fca5 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -19,6 +19,7 @@
#ifndef LIB_PRODUCERIMPL_H_
#define LIB_PRODUCERIMPL_H_
+#include <boost/optional.hpp>
#include <memory>
#include "Future.h"
@@ -31,7 +32,6 @@
#include "PeriodicTask.h"
#include "ProducerImplBase.h"
#include "Semaphore.h"
-#include "Utils.h"
namespace pulsar {
@@ -195,7 +195,7 @@ class ProducerImpl : public HandlerBase,
MemoryLimitController& memoryLimitController_;
const bool chunkingEnabled_;
- Optional<uint64_t> topicEpoch{Optional<uint64_t>::empty()};
+ boost::optional<uint64_t> topicEpoch;
};
struct ProducerImplCmp {
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index d1b25b5..a80e2e5 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -80,10 +80,9 @@ void ReaderImpl::start(const MessageId& startMessageId,
test::consumerConfigOfReader = consumerConf.clone();
}
- consumer_ = std::make_shared<ConsumerImpl>(client_.lock(), topic_, subscription, consumerConf,
- TopicName::get(topic_)->isPersistent(), ExecutorServicePtr(),
- false, NonPartitioned, Commands::SubscriptionModeNonDurable,
- Optional<MessageId>::of(startMessageId));
+ consumer_ = std::make_shared<ConsumerImpl>(
+ client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
+ ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, startMessageId);
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
auto self = shared_from_this();
consumer_->getConsumerCreatedFuture().addListener(
diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h
index b8a8c91..a274bd8 100644
--- a/lib/SynchronizedHashMap.h
+++ b/lib/SynchronizedHashMap.h
@@ -18,14 +18,13 @@
*/
#pragma once
+#include <boost/optional.hpp>
#include <functional>
#include <mutex>
#include <unordered_map>
#include <utility>
#include <vector>
-#include "Utils.h"
-
namespace pulsar {
// V must be default constructible and copyable
@@ -35,7 +34,7 @@ class SynchronizedHashMap {
using Lock = std::lock_guard<MutexType>;
public:
- using OptValue = Optional<V>;
+ using OptValue = boost::optional<V>;
using PairVector = std::vector<std::pair<K, V>>;
using MapType = std::unordered_map<K, V>;
using Iterator = typename MapType::iterator;
@@ -85,9 +84,9 @@ class SynchronizedHashMap {
Lock lock(mutex_);
auto it = data_.find(key);
if (it != data_.end()) {
- return OptValue::of(it->second);
+ return it->second;
} else {
- return OptValue::empty();
+ return boost::none;
}
}
@@ -95,21 +94,21 @@ class SynchronizedHashMap {
Lock lock(mutex_);
for (const auto& kv : data_) {
if (f(kv.second)) {
- return OptValue::of(kv.second);
+ return kv.second;
}
}
- return OptValue::empty();
+ return boost::none;
}
OptValue remove(const K& key) {
Lock lock(mutex_);
auto it = data_.find(key);
if (it != data_.end()) {
- auto result = OptValue::of(std::move(it->second));
+ auto result = boost::make_optional(std::move(it->second));
data_.erase(it);
return result;
} else {
- return OptValue::empty();
+ return boost::none;
}
}
diff --git a/lib/Utils.h b/lib/Utils.h
index 016f09f..d6303a3 100644
--- a/lib/Utils.h
+++ b/lib/Utils.h
@@ -69,38 +69,6 @@ inline std::ostream& operator<<(std::ostream& os, const std::map<Result, unsigne
return os;
}
-/**
- * Utility class that encloses an optional value
- */
-template <typename T>
-class Optional {
- public:
- const T& value() const { return value_; }
-
- bool is_present() const { return present_; }
-
- bool is_empty() const { return !present_; }
-
- /**
- * Create an Optional with the bound value
- */
- static Optional<T> of(const T& value) { return Optional<T>(value); }
- static Optional<T> of(T&& value) { return Optional<T>(std::move(value)); }
-
- /**
- * Create an empty optional
- */
- static Optional<T> empty() { return Optional<T>(); }
-
- Optional() : value_(), present_(false) {}
-
- private:
- Optional(const T& value) : value_(value), present_(true) {}
- Optional(T&& value) : value_(std::move(value)), present_(true) {}
-
- T value_;
- bool present_;
-};
} // namespace pulsar
#endif /* UTILS_HPP_ */
diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc
index 87cbe1e..85378e0 100644
--- a/tests/SynchronizedHashMapTest.cc
+++ b/tests/SynchronizedHashMapTest.cc
@@ -59,25 +59,25 @@ TEST(SynchronizedHashMap, testRemoveAndFind) {
OptValue optValue;
optValue = m.findFirstValueIf([](const int& x) { return x == 200; });
- ASSERT_TRUE(optValue.is_present());
+ ASSERT_TRUE(optValue);
ASSERT_EQ(optValue.value(), 200);
optValue = m.findFirstValueIf([](const int& x) { return x >= 301; });
- ASSERT_FALSE(optValue.is_present());
+ ASSERT_FALSE(optValue);
optValue = m.find(1);
- ASSERT_TRUE(optValue.is_present());
+ ASSERT_TRUE(optValue);
ASSERT_EQ(optValue.value(), 100);
- ASSERT_FALSE(m.find(0).is_present());
- ASSERT_FALSE(m.remove(0).is_present());
+ ASSERT_FALSE(m.find(0));
+ ASSERT_FALSE(m.remove(0));
optValue = m.remove(1);
- ASSERT_TRUE(optValue.is_present());
+ ASSERT_TRUE(optValue);
ASSERT_EQ(optValue.value(), 100);
- ASSERT_FALSE(m.remove(1).is_present());
- ASSERT_FALSE(m.find(1).is_present());
+ ASSERT_FALSE(m.remove(1));
+ ASSERT_FALSE(m.find(1));
}
TEST(SynchronizedHashMapTest, testForEach) {
@@ -99,7 +99,7 @@ TEST(SynchronizedHashMap, testRecursiveMutex) {
m.forEach([&m, &optValue](const int& key, const int& value) {
optValue = m.find(key); // the internal mutex was locked again
});
- ASSERT_TRUE(optValue.is_present());
+ ASSERT_TRUE(optValue);
ASSERT_EQ(optValue.value(), 100);
}