You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/12/07 03:13:40 UTC

[pulsar-client-cpp] branch main updated: Use boost::optional instead self-written Optional class (#138)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 18dcdd5  Use boost::optional instead self-written Optional class (#138)
18dcdd5 is described below

commit 18dcdd55afa4694933d293cda4035365dcb720a4
Author: A <fr...@ngs.ru>
AuthorDate: Wed Dec 7 10:13:35 2022 +0700

    Use boost::optional instead self-written Optional class (#138)
    
    Fixes #134
    
    ### Motivation
    
    Switch to use boost::optional instead self-written Optional class
    
    ### Modifications
    
    Remove class Optional in Utils.h and change all usage to boost::optional
---
 lib/ClientConnection.cc          |  5 +++--
 lib/ClientConnection.h           |  4 ++--
 lib/Commands.cc                  |  8 ++++----
 lib/Commands.h                   | 22 ++++++++++------------
 lib/ConsumerImpl.cc              | 40 ++++++++++++++++++++--------------------
 lib/ConsumerImpl.h               | 17 +++++++++--------
 lib/MultiTopicsConsumerImpl.cc   | 15 +++++++--------
 lib/ProducerConfiguration.cc     |  8 ++++----
 lib/ProducerConfigurationImpl.h  |  7 +++----
 lib/ProducerImpl.h               |  4 ++--
 lib/ReaderImpl.cc                |  7 +++----
 lib/SynchronizedHashMap.h        | 17 ++++++++---------
 lib/Utils.h                      | 32 --------------------------------
 tests/SynchronizedHashMapTest.cc | 18 +++++++++---------
 14 files changed, 84 insertions(+), 120 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 5c14467..51a09f4 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -20,6 +20,7 @@
 
 #include <pulsar/MessageIdBuilder.h>
 
+#include <boost/optional.hpp>
 #include <fstream>
 
 #include "Commands.h"
@@ -1093,9 +1094,9 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
                                 data.schemaVersion = producerSuccess.schema_version();
                             }
                             if (producerSuccess.has_topic_epoch()) {
-                                data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
+                                data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch());
                             } else {
-                                data.topicEpoch = Optional<uint64_t>::empty();
+                                data.topicEpoch = boost::none;
                             }
                             requestData.promise.setValue(data);
                             requestData.timer->cancel();
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index a07e2cd..62a5e9b 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -29,6 +29,7 @@
 #include <boost/asio/ip/tcp.hpp>
 #include <boost/asio/ssl/stream.hpp>
 #include <boost/asio/strand.hpp>
+#include <boost/optional.hpp>
 #include <deque>
 #include <functional>
 #include <memory>
@@ -40,7 +41,6 @@
 #include "LookupDataResult.h"
 #include "SharedBuffer.h"
 #include "UtilAllocator.h"
-#include "Utils.h"
 
 namespace pulsar {
 
@@ -83,7 +83,7 @@ struct ResponseData {
     std::string producerName;
     int64_t lastSequenceId;
     std::string schemaVersion;
-    Optional<uint64_t> topicEpoch;
+    boost::optional<uint64_t> topicEpoch;
 };
 
 typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
diff --git a/lib/Commands.cc b/lib/Commands.cc
index f97b0eb..5bb9587 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -296,7 +296,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication,
 SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
                                     uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
                                     const std::string& consumerName, SubscriptionMode subscriptionMode,
-                                    Optional<MessageId> startMessageId, bool readCompacted,
+                                    boost::optional<MessageId> startMessageId, bool readCompacted,
                                     const std::map<std::string, std::string>& metadata,
                                     const std::map<std::string, std::string>& subscriptionProperties,
                                     const SchemaInfo& schemaInfo,
@@ -323,7 +323,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
         subscribe->set_allocated_schema(getSchema(schemaInfo));
     }
 
-    if (startMessageId.is_present()) {
+    if (startMessageId) {
         MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
         messageIdData.set_ledgerid(startMessageId.value().ledgerId());
         messageIdData.set_entryid(startMessageId.value().entryId());
@@ -383,7 +383,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
                                    const std::map<std::string, std::string>& metadata,
                                    const SchemaInfo& schemaInfo, uint64_t epoch,
                                    bool userProvidedProducerName, bool encrypted,
-                                   ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch) {
+                                   ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::PRODUCER);
     CommandProducer* producer = cmd.mutable_producer();
@@ -394,7 +394,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
     producer->set_user_provided_producer_name(userProvidedProducerName);
     producer->set_encrypted(encrypted);
     producer->set_producer_access_mode(static_cast<proto::ProducerAccessMode>(accessMode));
-    if (topicEpoch.is_present()) {
+    if (topicEpoch) {
         producer->set_topic_epoch(topicEpoch.value());
     }
 
diff --git a/lib/Commands.h b/lib/Commands.h
index 6681f13..bbe96fd 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -25,11 +25,11 @@
 #include <pulsar/Schema.h>
 #include <pulsar/defines.h>
 
+#include <boost/optional.hpp>
 #include <set>
 
 #include "ProtoApiEnums.h"
 #include "SharedBuffer.h"
-#include "Utils.h"
 
 using namespace pulsar;
 
@@ -89,16 +89,14 @@ class Commands {
                                     uint64_t sequenceId, ChecksumType checksumType,
                                     const proto::MessageMetadata& metadata, const SharedBuffer& payload);
 
-    static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
-                                     uint64_t consumerId, uint64_t requestId,
-                                     CommandSubscribe_SubType subType, const std::string& consumerName,
-                                     SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
-                                     bool readCompacted, const std::map<std::string, std::string>& metadata,
-                                     const std::map<std::string, std::string>& subscriptionProperties,
-                                     const SchemaInfo& schemaInfo,
-                                     CommandSubscribe_InitialPosition subscriptionInitialPosition,
-                                     bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
-                                     int priorityLevel = 0);
+    static SharedBuffer newSubscribe(
+        const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId,
+        CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode,
+        boost::optional<MessageId> startMessageId, bool readCompacted,
+        const std::map<std::string, std::string>& metadata,
+        const std::map<std::string, std::string>& subscriptionProperties, const SchemaInfo& schemaInfo,
+        CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState,
+        KeySharedPolicy keySharedPolicy, int priorityLevel = 0);
 
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
 
@@ -107,7 +105,7 @@ class Commands {
                                     const std::map<std::string, std::string>& metadata,
                                     const SchemaInfo& schemaInfo, uint64_t epoch,
                                     bool userProvidedProducerName, bool encrypted,
-                                    ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch);
+                                    ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch);
 
     static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
                                CommandAck_AckType ackType, CommandAck_ValidationError validationError);
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 41b13ae..cf01e36 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -54,7 +54,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
                            const ExecutorServicePtr listenerExecutor /* = NULL by default */,
                            bool hasParent /* = false by default */,
                            const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
-                           Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
+                           Commands::SubscriptionMode subscriptionMode,
+                           boost::optional<MessageId> startMessageId)
     : ConsumerImplBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
                        listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()),
       waitingForZeroQueueSizeMessage(false),
@@ -191,9 +192,8 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     Lock lockForMessageId(mutexForMessageId_);
     // Update startMessageId so that we can discard messages after delivery restarts
     const auto startMessageId = clearReceiveQueue();
-    const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
-                                        ? startMessageId
-                                        : Optional<MessageId>::empty();
+    const auto subscribeMessageId =
+        (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId : boost::none;
     startMessageId_ = startMessageId;
     lockForMessageId.unlock();
 
@@ -373,11 +373,11 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
     });
 }
 
-Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
-                                                         const proto::MessageMetadata& metadata,
-                                                         const MessageId& messageId,
-                                                         const proto::MessageIdData& messageIdData,
-                                                         const ClientConnectionPtr& cnx) {
+boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
+                                                                const proto::MessageMetadata& metadata,
+                                                                const MessageId& messageId,
+                                                                const proto::MessageIdData& messageIdData,
+                                                                const ClientConnectionPtr& cnx) {
     const auto chunkId = metadata.chunk_id();
     const auto uuid = metadata.uuid();
     LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
@@ -422,14 +422,14 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
         lock.unlock();
         increaseAvailablePermits(cnx);
         trackMessage(messageId);
-        return Optional<SharedBuffer>::empty();
+        return boost::none;
     }
 
     chunkedMsgCtx.appendChunk(messageId, payload);
     if (!chunkedMsgCtx.isCompleted()) {
         lock.unlock();
         increaseAvailablePermits(cnx);
-        return Optional<SharedBuffer>::empty();
+        return boost::none;
     }
 
     LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
@@ -438,9 +438,9 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
     auto wholePayload = chunkedMsgCtx.getBuffer();
     chunkedMessageCache_.remove(uuid);
     if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
-        return Optional<SharedBuffer>::of(wholePayload);
+        return wholePayload;
     } else {
-        return Optional<SharedBuffer>::empty();
+        return boost::none;
     }
 }
 
@@ -477,7 +477,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
         const auto& messageIdData = msg.message_id();
         auto messageId = MessageIdBuilder::from(messageIdData).build();
         auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
-        if (optionalPayload.is_present()) {
+        if (optionalPayload) {
             payload = optionalPayload.value();
         } else {
             return;
@@ -512,7 +512,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
         m.impl_->convertPayloadToKeyValue(config_.getSchema());
 
         const auto startMessageId = startMessageId_.get();
-        if (isPersistent_ && startMessageId.is_present() &&
+        if (isPersistent_ && startMessageId &&
             m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
             m.getMessageId().entryId() == startMessageId.value().entryId() &&
             isPriorEntryIndex(m.getMessageId().entryId())) {
@@ -637,7 +637,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
         msg.impl_->setTopicName(batchedMessage.getTopicName());
         msg.impl_->convertPayloadToKeyValue(config_.getSchema());
 
-        if (startMessageId.is_present()) {
+        if (startMessageId) {
             const MessageId& msgId = msg.getMessageId();
 
             // If we are receiving a batch message, we need to discard messages that were prior
@@ -921,10 +921,10 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
  * was
  * not seen by the application
  */
-Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
+boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
     bool expectedDuringSeek = true;
     if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
-        return Optional<MessageId>::of(seekMessageId_.get());
+        return seekMessageId_.get();
     } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
         return startMessageId_.get();
     }
@@ -943,12 +943,12 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
                                            .ledgerId(nextMessageId.ledgerId())
                                            .entryId(nextMessageId.entryId() - 1)
                                            .build();
-        return Optional<MessageId>::of(previousMessageId);
+        return previousMessageId;
     } else if (lastDequedMessageId_ != MessageId::earliest()) {
         // If the queue was empty we need to restart from the message just after the last one that has been
         // dequeued
         // in the past
-        return Optional<MessageId>::of(lastDequedMessageId_);
+        return lastDequedMessageId_;
     } else {
         // No message was received or dequeued by this consumer. Next message would still be the
         // startMessageId
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index ce6c3f0..e9af3f6 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -21,6 +21,7 @@
 
 #include <pulsar/Reader.h>
 
+#include <boost/optional.hpp>
 #include <functional>
 #include <memory>
 
@@ -71,7 +72,7 @@ class ConsumerImpl : public ConsumerImplBase {
                  const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(), bool hasParent = false,
                  const ConsumerTopicType consumerTopicType = NonPartitioned,
                  Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
-                 Optional<MessageId> startMessageId = Optional<MessageId>::empty());
+                 boost::optional<MessageId> startMessageId = boost::none);
     ~ConsumerImpl();
     void setPartitionIndex(int partitionIndex);
     int getPartitionIndex();
@@ -193,7 +194,7 @@ class ConsumerImpl : public ConsumerImplBase {
                                        const DeadlineTimerPtr& timer,
                                        BrokerGetLastMessageIdCallback callback);
 
-    Optional<MessageId> clearReceiveQueue();
+    boost::optional<MessageId> clearReceiveQueue();
     void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
                            ResultCallback callback);
 
@@ -236,7 +237,7 @@ class ConsumerImpl : public ConsumerImplBase {
     MessageId lastMessageIdInBroker_{MessageId::earliest()};
 
     std::atomic_bool duringSeek_{false};
-    Synchronized<Optional<MessageId>> startMessageId_{Optional<MessageId>::empty()};
+    Synchronized<boost::optional<MessageId>> startMessageId_;
     Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
 
     class ChunkedMessageCtx {
@@ -321,11 +322,11 @@ class ConsumerImpl : public ConsumerImplBase {
      * @return the concatenated payload if chunks are concatenated into a completed message payload
      *   successfully, else Optional::empty()
      */
-    Optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
-                                               const proto::MessageMetadata& metadata,
-                                               const MessageId& messageId,
-                                               const proto::MessageIdData& messageIdData,
-                                               const ClientConnectionPtr& cnx);
+    boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
+                                                      const proto::MessageMetadata& metadata,
+                                                      const MessageId& messageId,
+                                                      const proto::MessageIdData& messageIdData,
+                                                      const ClientConnectionPtr& cnx);
 
     friend class PulsarFriend;
 
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index b8d55b4..d9d3873 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -373,7 +373,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
     for (int i = 0; i < numberPartitions; i++) {
         std::string topicPartitionName = topicName->getTopicPartitionName(i);
         auto optConsumer = consumers_.find(topicPartitionName);
-        if (optConsumer.is_empty()) {
+        if (!optConsumer) {
             LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " << topicPartitionName);
             callback(ResultUnknownError);
             continue;
@@ -400,7 +400,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
     LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " << topicPartitionName);
 
     auto optConsumer = consumers_.remove(topicPartitionName);
-    if (optConsumer.is_present()) {
+    if (optConsumer) {
         optConsumer.value()->pauseMessageListener();
     }
 
@@ -638,7 +638,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     const std::string& topicPartitionName = msgId.getTopicName();
     auto optConsumer = consumers_.find(topicPartitionName);
 
-    if (optConsumer.is_present()) {
+    if (optConsumer) {
         unAckedMessageTrackerPtr_->remove(msgId);
         optConsumer.value()->acknowledgeAsync(msgId, callback);
     } else {
@@ -674,7 +674,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
     };
     for (const auto& kv : topicToMessageId) {
         auto optConsumer = consumers_.find(kv.first);
-        if (optConsumer.is_present()) {
+        if (optConsumer) {
             unAckedMessageTrackerPtr_->remove(kv.second);
             optConsumer.value()->acknowledgeAsync(kv.second, cb);
         } else {
@@ -691,7 +691,7 @@ void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
 void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
     auto optConsumer = consumers_.find(msgId.getTopicName());
 
-    if (optConsumer.is_present()) {
+    if (optConsumer) {
         unAckedMessageTrackerPtr_->remove(msgId);
         optConsumer.value()->negativeAcknowledge(msgId);
     }
@@ -868,9 +868,8 @@ bool MultiTopicsConsumerImpl::isConnected() const {
         return false;
     }
 
-    return consumers_
-        .findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); })
-        .is_empty();
+    return !consumers_.findFirstValueIf(
+        [](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); });
 }
 
 uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc
index 9b3fdfb..67e8b10 100644
--- a/lib/ProducerConfiguration.cc
+++ b/lib/ProducerConfiguration.cc
@@ -36,21 +36,21 @@ ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfigurat
 }
 
 ProducerConfiguration& ProducerConfiguration::setProducerName(const std::string& producerName) {
-    impl_->producerName = Optional<std::string>::of(producerName);
+    impl_->producerName = boost::make_optional(producerName);
     return *this;
 }
 
 const std::string& ProducerConfiguration::getProducerName() const {
-    return impl_->producerName.is_present() ? impl_->producerName.value() : emptyString;
+    return !impl_->producerName ? emptyString : impl_->producerName.value();
 }
 
 ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t initialSequenceId) {
-    impl_->initialSequenceId = Optional<int64_t>::of(initialSequenceId);
+    impl_->initialSequenceId = boost::make_optional(initialSequenceId);
     return *this;
 }
 
 int64_t ProducerConfiguration::getInitialSequenceId() const {
-    return impl_->initialSequenceId.is_present() ? impl_->initialSequenceId.value() : -1ll;
+    return !impl_->initialSequenceId ? -1ll : impl_->initialSequenceId.value();
 }
 
 ProducerConfiguration& ProducerConfiguration::setSendTimeout(int sendTimeoutMs) {
diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h
index 6c2b19d..83723f6 100644
--- a/lib/ProducerConfigurationImpl.h
+++ b/lib/ProducerConfigurationImpl.h
@@ -21,16 +21,15 @@
 
 #include <pulsar/ProducerConfiguration.h>
 
+#include <boost/optional.hpp>
 #include <memory>
 
-#include "Utils.h"
-
 namespace pulsar {
 
 struct ProducerConfigurationImpl {
     SchemaInfo schemaInfo;
-    Optional<std::string> producerName;
-    Optional<int64_t> initialSequenceId;
+    boost::optional<std::string> producerName;
+    boost::optional<int64_t> initialSequenceId;
     int sendTimeoutMs{30000};
     CompressionType compressionType{CompressionNone};
     int maxPendingMessages{1000};
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index cbf99ec..928fca5 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -19,6 +19,7 @@
 #ifndef LIB_PRODUCERIMPL_H_
 #define LIB_PRODUCERIMPL_H_
 
+#include <boost/optional.hpp>
 #include <memory>
 
 #include "Future.h"
@@ -31,7 +32,6 @@
 #include "PeriodicTask.h"
 #include "ProducerImplBase.h"
 #include "Semaphore.h"
-#include "Utils.h"
 
 namespace pulsar {
 
@@ -195,7 +195,7 @@ class ProducerImpl : public HandlerBase,
 
     MemoryLimitController& memoryLimitController_;
     const bool chunkingEnabled_;
-    Optional<uint64_t> topicEpoch{Optional<uint64_t>::empty()};
+    boost::optional<uint64_t> topicEpoch;
 };
 
 struct ProducerImplCmp {
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index d1b25b5..a80e2e5 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -80,10 +80,9 @@ void ReaderImpl::start(const MessageId& startMessageId,
         test::consumerConfigOfReader = consumerConf.clone();
     }
 
-    consumer_ = std::make_shared<ConsumerImpl>(client_.lock(), topic_, subscription, consumerConf,
-                                               TopicName::get(topic_)->isPersistent(), ExecutorServicePtr(),
-                                               false, NonPartitioned, Commands::SubscriptionModeNonDurable,
-                                               Optional<MessageId>::of(startMessageId));
+    consumer_ = std::make_shared<ConsumerImpl>(
+        client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
+        ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, startMessageId);
     consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
     auto self = shared_from_this();
     consumer_->getConsumerCreatedFuture().addListener(
diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h
index b8a8c91..a274bd8 100644
--- a/lib/SynchronizedHashMap.h
+++ b/lib/SynchronizedHashMap.h
@@ -18,14 +18,13 @@
  */
 #pragma once
 
+#include <boost/optional.hpp>
 #include <functional>
 #include <mutex>
 #include <unordered_map>
 #include <utility>
 #include <vector>
 
-#include "Utils.h"
-
 namespace pulsar {
 
 // V must be default constructible and copyable
@@ -35,7 +34,7 @@ class SynchronizedHashMap {
     using Lock = std::lock_guard<MutexType>;
 
    public:
-    using OptValue = Optional<V>;
+    using OptValue = boost::optional<V>;
     using PairVector = std::vector<std::pair<K, V>>;
     using MapType = std::unordered_map<K, V>;
     using Iterator = typename MapType::iterator;
@@ -85,9 +84,9 @@ class SynchronizedHashMap {
         Lock lock(mutex_);
         auto it = data_.find(key);
         if (it != data_.end()) {
-            return OptValue::of(it->second);
+            return it->second;
         } else {
-            return OptValue::empty();
+            return boost::none;
         }
     }
 
@@ -95,21 +94,21 @@ class SynchronizedHashMap {
         Lock lock(mutex_);
         for (const auto& kv : data_) {
             if (f(kv.second)) {
-                return OptValue::of(kv.second);
+                return kv.second;
             }
         }
-        return OptValue::empty();
+        return boost::none;
     }
 
     OptValue remove(const K& key) {
         Lock lock(mutex_);
         auto it = data_.find(key);
         if (it != data_.end()) {
-            auto result = OptValue::of(std::move(it->second));
+            auto result = boost::make_optional(std::move(it->second));
             data_.erase(it);
             return result;
         } else {
-            return OptValue::empty();
+            return boost::none;
         }
     }
 
diff --git a/lib/Utils.h b/lib/Utils.h
index 016f09f..d6303a3 100644
--- a/lib/Utils.h
+++ b/lib/Utils.h
@@ -69,38 +69,6 @@ inline std::ostream& operator<<(std::ostream& os, const std::map<Result, unsigne
     return os;
 }
 
-/**
- * Utility class that encloses an optional value
- */
-template <typename T>
-class Optional {
-   public:
-    const T& value() const { return value_; }
-
-    bool is_present() const { return present_; }
-
-    bool is_empty() const { return !present_; }
-
-    /**
-     * Create an Optional with the bound value
-     */
-    static Optional<T> of(const T& value) { return Optional<T>(value); }
-    static Optional<T> of(T&& value) { return Optional<T>(std::move(value)); }
-
-    /**
-     * Create an empty optional
-     */
-    static Optional<T> empty() { return Optional<T>(); }
-
-    Optional() : value_(), present_(false) {}
-
-   private:
-    Optional(const T& value) : value_(value), present_(true) {}
-    Optional(T&& value) : value_(std::move(value)), present_(true) {}
-
-    T value_;
-    bool present_;
-};
 }  // namespace pulsar
 
 #endif /* UTILS_HPP_ */
diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc
index 87cbe1e..85378e0 100644
--- a/tests/SynchronizedHashMapTest.cc
+++ b/tests/SynchronizedHashMapTest.cc
@@ -59,25 +59,25 @@ TEST(SynchronizedHashMap, testRemoveAndFind) {
 
     OptValue optValue;
     optValue = m.findFirstValueIf([](const int& x) { return x == 200; });
-    ASSERT_TRUE(optValue.is_present());
+    ASSERT_TRUE(optValue);
     ASSERT_EQ(optValue.value(), 200);
 
     optValue = m.findFirstValueIf([](const int& x) { return x >= 301; });
-    ASSERT_FALSE(optValue.is_present());
+    ASSERT_FALSE(optValue);
 
     optValue = m.find(1);
-    ASSERT_TRUE(optValue.is_present());
+    ASSERT_TRUE(optValue);
     ASSERT_EQ(optValue.value(), 100);
 
-    ASSERT_FALSE(m.find(0).is_present());
-    ASSERT_FALSE(m.remove(0).is_present());
+    ASSERT_FALSE(m.find(0));
+    ASSERT_FALSE(m.remove(0));
 
     optValue = m.remove(1);
-    ASSERT_TRUE(optValue.is_present());
+    ASSERT_TRUE(optValue);
     ASSERT_EQ(optValue.value(), 100);
 
-    ASSERT_FALSE(m.remove(1).is_present());
-    ASSERT_FALSE(m.find(1).is_present());
+    ASSERT_FALSE(m.remove(1));
+    ASSERT_FALSE(m.find(1));
 }
 
 TEST(SynchronizedHashMapTest, testForEach) {
@@ -99,7 +99,7 @@ TEST(SynchronizedHashMap, testRecursiveMutex) {
     m.forEach([&m, &optValue](const int& key, const int& value) {
         optValue = m.find(key);  // the internal mutex was locked again
     });
-    ASSERT_TRUE(optValue.is_present());
+    ASSERT_TRUE(optValue);
     ASSERT_EQ(optValue.value(), 100);
 }