You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/30 21:08:52 UTC

[pulsar] branch master updated: Replace boost::mutex and boost::thread with std::mutex and std::thread (#3461)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d14680  Replace boost::mutex and boost::thread with std::mutex and std::thread (#3461)
1d14680 is described below

commit 1d14680c8bcdae4159afc230b12c22545f855a58
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jan 30 13:08:47 2019 -0800

    Replace boost::mutex and boost::thread with std::mutex and std::thread (#3461)
    
    * Replace boost::mutex and boost::thread with std::mutex and std::thread
    
    * Added missing mutex header
    
    * Added missing vector header
    
    * Removed extra tab
    
    * Added missing <atomic> include
    
    * Missing <memory> includes
---
 pulsar-client-cpp/CMakeLists.txt                   |  2 +-
 pulsar-client-cpp/lib/Authentication.cc            | 14 +++---
 .../lib/BatchAcknowledgementTracker.h              |  6 +--
 pulsar-client-cpp/lib/BinaryProtoLookupService.h   |  4 +-
 pulsar-client-cpp/lib/BlockingQueue.h              | 17 +++----
 pulsar-client-cpp/lib/ClientConnection.h           |  6 +--
 pulsar-client-cpp/lib/ClientImpl.cc                |  5 +-
 pulsar-client-cpp/lib/ClientImpl.h                 |  4 +-
 pulsar-client-cpp/lib/Commands.cc                  | 17 +++----
 pulsar-client-cpp/lib/CompressionCodecZLib.h       |  1 -
 pulsar-client-cpp/lib/ConnectionPool.cc            |  2 +-
 pulsar-client-cpp/lib/ConnectionPool.h             |  4 +-
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  6 +--
 pulsar-client-cpp/lib/ConsumerImpl.h               |  4 +-
 pulsar-client-cpp/lib/ExecutorService.h            |  6 +--
 pulsar-client-cpp/lib/Future.h                     | 11 +++--
 pulsar-client-cpp/lib/HandlerBase.h                |  6 +--
 pulsar-client-cpp/lib/Latch.cc                     | 13 -----
 pulsar-client-cpp/lib/Latch.h                      | 24 +++++++---
 pulsar-client-cpp/lib/LogUtils.h                   |  4 +-
 pulsar-client-cpp/lib/LookupDataResult.h           |  3 ++
 pulsar-client-cpp/lib/LookupService.h              |  2 +
 pulsar-client-cpp/lib/MessageCrypto.h              |  6 +--
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  4 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  6 +--
 pulsar-client-cpp/lib/ObjectPool.h                 | 16 +++----
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |  4 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |  6 +--
 pulsar-client-cpp/lib/PartitionedProducerImpl.h    |  6 +--
 pulsar-client-cpp/lib/ProducerImpl.h               |  4 +-
 pulsar-client-cpp/lib/RoundRobinMessageRouter.h    |  6 +--
 pulsar-client-cpp/lib/TopicName.cc                 |  4 +-
 pulsar-client-cpp/lib/TopicName.h                  |  4 +-
 .../lib/UnAckedMessageTrackerEnabled.cc            | 12 ++---
 .../lib/UnAckedMessageTrackerEnabled.h             |  5 +-
 .../lib/UnAckedMessageTrackerInterface.h           |  3 --
 pulsar-client-cpp/lib/UnboundedBlockingQueue.h     | 16 +++----
 pulsar-client-cpp/lib/Utils.h                      |  3 ++
 pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc     |  8 ++--
 pulsar-client-cpp/lib/auth/athenz/ZTSClient.h      |  2 +-
 pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h    |  2 +-
 pulsar-client-cpp/lib/stats/ProducerStatsImpl.h    |  7 +--
 pulsar-client-cpp/perf/PerfConsumer.cc             |  7 +--
 pulsar-client-cpp/perf/PerfProducer.cc             | 55 ++++++++++++----------
 pulsar-client-cpp/perf/RateLimiter.h               |  6 +--
 pulsar-client-cpp/tests/AuthPluginTest.cc          |  6 +--
 pulsar-client-cpp/tests/AuthTokenTest.cc           |  1 -
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 12 ++---
 pulsar-client-cpp/tests/BatchMessageTest.cc        |  1 -
 pulsar-client-cpp/tests/BlockingQueueTest.cc       | 17 +++----
 pulsar-client-cpp/tests/ConsumerStatsTest.cc       |  4 +-
 pulsar-client-cpp/tests/LatchTest.cc               | 42 ++++++++---------
 pulsar-client-cpp/tests/ZeroQueueSizeTest.cc       |  2 +-
 53 files changed, 221 insertions(+), 217 deletions(-)

diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 57aaf27..f122c79 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -106,7 +106,7 @@ else()
     endif (USE_LOG4CXX)
 endif (LINK_STATIC)
 
-find_package(Boost REQUIRED COMPONENTS program_options filesystem regex thread system)
+find_package(Boost REQUIRED COMPONENTS program_options filesystem regex system)
 
 if (BUILD_PYTHON_WRAPPER)
     find_package(PythonLibs REQUIRED)
diff --git a/pulsar-client-cpp/lib/Authentication.cc b/pulsar-client-cpp/lib/Authentication.cc
index 3b9c45f..f552b53 100644
--- a/pulsar-client-cpp/lib/Authentication.cc
+++ b/pulsar-client-cpp/lib/Authentication.cc
@@ -29,7 +29,7 @@
 #include <iostream>
 #include <dlfcn.h>
 #include <cstdlib>
-#include <boost/thread.hpp>
+#include <mutex>
 #include <boost/algorithm/string.hpp>
 
 DECLARE_LOG_OBJECT()
@@ -103,12 +103,12 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP
     return AuthFactory::create(pluginNameOrDynamicLibPath, params);
 }
 
-boost::mutex mutex;
+std::mutex mutex;
 std::vector<void*> AuthFactory::loadedLibrariesHandles_;
 bool AuthFactory::isShutdownHookRegistered_ = false;
 
 void AuthFactory::release_handles() {
-    boost::lock_guard<boost::mutex> lock(mutex);
+    std::lock_guard<std::mutex> lock(mutex);
     for (std::vector<void*>::iterator ite = AuthFactory::loadedLibrariesHandles_.begin();
          ite != AuthFactory::loadedLibrariesHandles_.end(); ite++) {
         dlclose(*ite);
@@ -147,7 +147,7 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const std:
 AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibPath,
                                       const std::string& authParamsString) {
     {
-        boost::lock_guard<boost::mutex> lock(mutex);
+        std::lock_guard<std::mutex> lock(mutex);
         if (!AuthFactory::isShutdownHookRegistered_) {
             atexit(release_handles);
             AuthFactory::isShutdownHookRegistered_ = true;
@@ -163,7 +163,7 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP
     void* handle = dlopen(pluginNameOrDynamicLibPath.c_str(), RTLD_LAZY);
     if (handle != NULL) {
         {
-            boost::lock_guard<boost::mutex> lock(mutex);
+            std::lock_guard<std::mutex> lock(mutex);
             loadedLibrariesHandles_.push_back(handle);
         }
         Authentication* (*createAuthentication)(const std::string&);
@@ -183,7 +183,7 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP
 
 AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibPath, ParamMap& params) {
     {
-        boost::lock_guard<boost::mutex> lock(mutex);
+        std::lock_guard<std::mutex> lock(mutex);
         if (!AuthFactory::isShutdownHookRegistered_) {
             atexit(release_handles);
             AuthFactory::isShutdownHookRegistered_ = true;
@@ -198,7 +198,7 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP
     Authentication* auth = NULL;
     void* handle = dlopen(pluginNameOrDynamicLibPath.c_str(), RTLD_LAZY);
     if (handle != NULL) {
-        boost::lock_guard<boost::mutex> lock(mutex);
+        std::lock_guard<std::mutex> lock(mutex);
         loadedLibrariesHandles_.push_back(handle);
         Authentication* (*createAuthentication)(ParamMap&);
         *(void**)(&createAuthentication) = dlsym(handle, "createFromMap");
diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
index 6b20c00..6a709b3 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
@@ -21,7 +21,7 @@
 
 #include "MessageImpl.h"
 #include <map>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #include <boost/dynamic_bitset.hpp>
 #include <lib/PulsarApi.pb.h>
 #include <algorithm>
@@ -34,10 +34,10 @@ class ConsumerImpl;
 
 class BatchAcknowledgementTracker {
    private:
-    typedef boost::unique_lock<boost::mutex> Lock;
+    typedef std::unique_lock<std::mutex> Lock;
     typedef std::pair<MessageId, boost::dynamic_bitset<> > TrackerPair;
     typedef std::map<MessageId, boost::dynamic_bitset<> > TrackerMap;
-    boost::mutex mutex_;
+    std::mutex mutex_;
 
     TrackerMap trackerMap_;
 
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
index fe193f4..7a9398a 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
@@ -24,6 +24,8 @@
 #include "ConnectionPool.h"
 #include "Backoff.h"
 #include <lib/LookupService.h>
+#include <mutex>
+
 #pragma GCC visibility push(default)
 
 namespace pulsar {
@@ -43,7 +45,7 @@ class BinaryProtoLookupService : public LookupService {
     Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName);
 
    private:
-    boost::mutex mutex_;
+    std::mutex mutex_;
     uint64_t requestIdGenerator_;
 
     std::string serviceUrl_;
diff --git a/pulsar-client-cpp/lib/BlockingQueue.h b/pulsar-client-cpp/lib/BlockingQueue.h
index 44bfac6..39f8511 100644
--- a/pulsar-client-cpp/lib/BlockingQueue.h
+++ b/pulsar-client-cpp/lib/BlockingQueue.h
@@ -19,8 +19,8 @@
 #ifndef LIB_BLOCKINGQUEUE_H_
 #define LIB_BLOCKINGQUEUE_H_
 
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition.hpp>
+#include <mutex>
+#include <condition_variable>
 #include <boost/circular_buffer.hpp>
 
 /**
@@ -199,9 +199,10 @@ class BlockingQueue {
         }
     }
 
-    bool pop(T& value, const boost::posix_time::time_duration& timeout) {
+    template <typename Duration>
+    bool pop(T& value, const Duration& timeout) {
         Lock lock(mutex_);
-        if (!queueEmptyCondition.timed_wait(lock, timeout, QueueNotEmpty<BlockingQueue<T> >(*this))) {
+        if (!queueEmptyCondition.wait_for(lock, timeout, QueueNotEmpty<BlockingQueue<T> >(*this))) {
             return false;
         }
 
@@ -279,13 +280,13 @@ class BlockingQueue {
     bool isFullNoMutex() const { return (queue_.size() + reservedSpots_) == maxSize_; }
 
     const size_t maxSize_;
-    mutable boost::mutex mutex_;
-    boost::condition_variable queueFullCondition;
-    boost::condition_variable queueEmptyCondition;
+    mutable std::mutex mutex_;
+    std::condition_variable queueFullCondition;
+    std::condition_variable queueEmptyCondition;
     Container queue_;
     int reservedSpots_;
 
-    typedef boost::unique_lock<boost::mutex> Lock;
+    typedef std::unique_lock<std::mutex> Lock;
     friend class QueueReservedSpot;
     friend struct QueueNotEmpty<BlockingQueue<T> >;
     friend struct QueueNotFull<BlockingQueue<T> >;
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index e1cb734..38e3803 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -24,7 +24,7 @@
 #include <boost/asio.hpp>
 #include <boost/asio/ssl.hpp>
 #include <boost/any.hpp>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #include <functional>
 #include <string>
 #include <vector>
@@ -288,8 +288,8 @@ class ClientConnection : public std::enable_shared_from_this<ClientConnection> {
     typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
     PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
 
-    boost::mutex mutex_;
-    typedef boost::unique_lock<boost::mutex> Lock;
+    std::mutex mutex_;
+    typedef std::unique_lock<std::mutex> Lock;
 
     // Pending buffers to write on the socket
     std::deque<boost::any> pendingWriteBuffers_;
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 8fa9120..f3b193d 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -27,16 +27,15 @@
 #include "MultiTopicsConsumerImpl.h"
 #include "PatternMultiTopicsConsumerImpl.h"
 #include "SimpleLoggerImpl.h"
-#include "Log4CxxLogger.h"
 #include <boost/bind.hpp>
 #include <boost/algorithm/string/predicate.hpp>
 #include <sstream>
 #include <openssl/sha.h>
-#include "boost/date_time/posix_time/posix_time.hpp"
 #include <lib/HTTPLookupService.h>
 #include <lib/TopicName.h>
 #include <algorithm>
 #include <regex>
+#include <mutex>
 
 DECLARE_LOG_OBJECT()
 
@@ -62,7 +61,7 @@ const std::string generateRandomName() {
 
     return hexHash.str();
 }
-typedef boost::unique_lock<boost::mutex> Lock;
+typedef std::unique_lock<std::mutex> Lock;
 
 typedef std::vector<std::string> StringList;
 
diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h
index 7d3b879..46b7c2b 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -24,7 +24,7 @@
 #include "BinaryProtoLookupService.h"
 #include "ConnectionPool.h"
 #include "LookupDataResult.h"
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #include <lib/TopicName.h>
 #include "ProducerImplBase.h"
 #include "ConsumerImplBase.h"
@@ -125,7 +125,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
         Closed
     };
 
-    boost::mutex mutex_;
+    std::mutex mutex_;
 
     State state_;
     std::string serviceUrl_;
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 4c80154..384e54c 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -28,7 +28,7 @@
 #include <pulsar/Schema.h>
 #include "checksum/ChecksumProvider.h"
 #include <algorithm>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 
 using namespace pulsar;
 namespace pulsar {
@@ -98,22 +98,21 @@ SharedBuffer Commands::writeMessageWithSize(const BaseCommand& cmd) {
 
 SharedBuffer Commands::newPartitionMetadataRequest(const std::string& topic, uint64_t requestId) {
     static BaseCommand cmd;
-    static boost::mutex mutex;
-    mutex.lock();
+    static std::mutex mutex;
+    std::lock_guard<std::mutex> lock(mutex);
     cmd.set_type(BaseCommand::PARTITIONED_METADATA);
     CommandPartitionedTopicMetadata* partitionMetadata = cmd.mutable_partitionmetadata();
     partitionMetadata->set_topic(topic);
     partitionMetadata->set_request_id(requestId);
     const SharedBuffer buffer = writeMessageWithSize(cmd);
     cmd.clear_partitionmetadata();
-    mutex.unlock();
     return buffer;
 }
 
 SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritative, uint64_t requestId) {
     static BaseCommand cmd;
-    static boost::mutex mutex;
-    mutex.lock();
+    static std::mutex mutex;
+    std::lock_guard<std::mutex> lock(mutex);
     cmd.set_type(BaseCommand::LOOKUP);
     CommandLookupTopic* lookup = cmd.mutable_lookuptopic();
     lookup->set_topic(topic);
@@ -121,21 +120,19 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat
     lookup->set_request_id(requestId);
     const SharedBuffer buffer = writeMessageWithSize(cmd);
     cmd.clear_lookuptopic();
-    mutex.unlock();
     return buffer;
 }
 
 SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) {
     static BaseCommand cmd;
-    static boost::mutex mutex;
-    mutex.lock();
+    static std::mutex mutex;
+    std::lock_guard<std::mutex> lock(mutex);
     cmd.set_type(BaseCommand::CONSUMER_STATS);
     CommandConsumerStats* consumerStats = cmd.mutable_consumerstats();
     consumerStats->set_consumer_id(consumerId);
     consumerStats->set_request_id(requestId);
     const SharedBuffer buffer = writeMessageWithSize(cmd);
     cmd.clear_consumerstats();
-    mutex.unlock();
     return buffer;
 }
 
diff --git a/pulsar-client-cpp/lib/CompressionCodecZLib.h b/pulsar-client-cpp/lib/CompressionCodecZLib.h
index 08fd0c9..41234ac 100644
--- a/pulsar-client-cpp/lib/CompressionCodecZLib.h
+++ b/pulsar-client-cpp/lib/CompressionCodecZLib.h
@@ -21,7 +21,6 @@
 
 #include "CompressionCodec.h"
 #include <zlib.h>
-#include <boost/thread/mutex.hpp>
 
 // Make symbol visible to unit tests
 #pragma GCC visibility push(default)
diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc
index 36ce4a5..993731e 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.cc
+++ b/pulsar-client-cpp/lib/ConnectionPool.cc
@@ -35,7 +35,7 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
 
 Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
     const std::string& logicalAddress, const std::string& physicalAddress) {
-    boost::unique_lock<boost::mutex> lock(mutex_);
+    std::unique_lock<std::mutex> lock(mutex_);
 
     if (poolConnections_) {
         PoolMap::iterator cnxIt = pool_.find(logicalAddress);
diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h
index 471454b..50441a9 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.h
+++ b/pulsar-client-cpp/lib/ConnectionPool.h
@@ -25,7 +25,7 @@
 
 #include <string>
 #include <map>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #pragma GCC visibility push(default)
 namespace pulsar {
 
@@ -62,7 +62,7 @@ class ConnectionPool {
     typedef std::map<std::string, ClientConnectionWeakPtr> PoolMap;
     PoolMap pool_;
     bool poolConnections_;
-    boost::mutex mutex_;
+    std::mutex mutex_;
 
     friend class ConnectionPoolTest;
 };
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 11dec26..a2cabf8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -497,7 +497,7 @@ void ConsumerImpl::internalListener() {
     }
     lock.unlock();
     Message msg;
-    if (!incomingMessages_.pop(msg, boost::posix_time::milliseconds(0))) {
+    if (!incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
         // This will only happen when the connection got reset and we cleared the queue
         return;
     }
@@ -572,7 +572,7 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
-    if (incomingMessages_.pop(msg, milliseconds(0))) {
+    if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
         lock.unlock();
         messageProcessed(msg);
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -637,7 +637,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
         return ResultInvalidConfiguration;
     }
 
-    if (incomingMessages_.pop(msg, milliseconds(timeout))) {
+    if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
         messageProcessed(msg);
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 7492902..01aa55f 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -142,7 +142,7 @@ class ConsumerImpl : public ConsumerImplBase,
 
     Optional<MessageId> clearReceiveQueue();
 
-    boost::mutex mutexForReceiveWithZeroQueueSize;
+    std::mutex mutexForReceiveWithZeroQueueSize;
     const ConsumerConfiguration config_;
     const std::string subscription_;
     std::string originalSubscriptionName_;
@@ -163,7 +163,7 @@ class ConsumerImpl : public ConsumerImplBase,
     int32_t partitionIndex_;
     Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
     bool messageListenerRunning_;
-    boost::mutex messageListenerMutex_;
+    std::mutex messageListenerMutex_;
     CompressionCodecProvider compressionCodecProvider_;
     UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h
index 2faceca..545abe5 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -24,7 +24,7 @@
 #include <boost/asio/ssl.hpp>
 #include <functional>
 #include <boost/noncopyable.hpp>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 
 #pragma GCC visibility push(default)
 
@@ -87,8 +87,8 @@ class ExecutorServiceProvider {
     typedef std::vector<ExecutorServicePtr> ExecutorList;
     ExecutorList executors_;
     int executorIdx_;
-    boost::mutex mutex_;
-    typedef boost::unique_lock<boost::mutex> Lock;
+    std::mutex mutex_;
+    typedef std::unique_lock<std::mutex> Lock;
 };
 
 typedef std::shared_ptr<ExecutorServiceProvider> ExecutorServiceProviderPtr;
diff --git a/pulsar-client-cpp/lib/Future.h b/pulsar-client-cpp/lib/Future.h
index c0e4c6e..d604645 100644
--- a/pulsar-client-cpp/lib/Future.h
+++ b/pulsar-client-cpp/lib/Future.h
@@ -20,21 +20,22 @@
 #define LIB_FUTURE_H_
 
 #include <functional>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
+#include <mutex>
+#include <memory>
+#include <condition_variable>
 
 #include <list>
 
 #pragma GCC visibility push(default)
 
-typedef boost::unique_lock<boost::mutex> Lock;
+typedef std::unique_lock<std::mutex> Lock;
 
 namespace pulsar {
 
 template <typename Result, typename Type>
 struct InternalState {
-    boost::mutex mutex;
-    boost::condition_variable condition;
+    std::mutex mutex;
+    std::condition_variable condition;
     Result result;
     Type value;
     bool complete;
diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h
index bcc824e..56f5ef3 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -90,12 +90,12 @@ class HandlerBase {
     ClientImplWeakPtr client_;
     const std::string topic_;
     ClientConnectionWeakPtr connection_;
-    boost::mutex mutex_;
-    boost::mutex pendingReceiveMutex_;
+    std::mutex mutex_;
+    std::mutex pendingReceiveMutex_;
     ptime creationTimestamp_;
 
     const TimeDuration operationTimeut_;
-    typedef boost::unique_lock<boost::mutex> Lock;
+    typedef std::unique_lock<std::mutex> Lock;
 
     enum State
     {
diff --git a/pulsar-client-cpp/lib/Latch.cc b/pulsar-client-cpp/lib/Latch.cc
index 3247940..c9cfab8 100644
--- a/pulsar-client-cpp/lib/Latch.cc
+++ b/pulsar-client-cpp/lib/Latch.cc
@@ -20,14 +20,6 @@
 
 namespace pulsar {
 
-struct CountIsZero {
-    const int& count_;
-
-    CountIsZero(const int& count) : count_(count) {}
-
-    bool operator()() const { return count_ == 0; }
-};
-
 Latch::Latch(int count) : state_(std::make_shared<InternalState>()) { state_->count = count; }
 
 void Latch::countdown() {
@@ -52,9 +44,4 @@ void Latch::wait() {
     state_->condition.wait(lock, CountIsZero(state_->count));
 }
 
-bool Latch::wait(const boost::posix_time::time_duration& timeout) {
-    Lock lock(state_->mutex);
-    return state_->condition.timed_wait(lock, timeout, CountIsZero(state_->count));
-}
-
 } /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/Latch.h b/pulsar-client-cpp/lib/Latch.h
index 360554c..f6e7dea 100644
--- a/pulsar-client-cpp/lib/Latch.h
+++ b/pulsar-client-cpp/lib/Latch.h
@@ -20,8 +20,8 @@
 #define LIB_LATCH_H_
 
 #include <memory>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
+#include <mutex>
+#include <condition_variable>
 
 #pragma GCC visibility push(default)
 
@@ -35,18 +35,30 @@ class Latch {
 
     void wait();
 
-    bool wait(const boost::posix_time::time_duration& timeout);
+    template <typename Duration>
+    bool wait(const Duration& timeout) {
+        Lock lock(state_->mutex);
+        return state_->condition.wait_for(lock, timeout, CountIsZero(state_->count));
+    }
 
     int getCount();
 
    private:
     struct InternalState {
-        boost::mutex mutex;
-        boost::condition_variable condition;
+        std::mutex mutex;
+        std::condition_variable condition;
         int count;
     };
 
-    typedef boost::unique_lock<boost::mutex> Lock;
+    struct CountIsZero {
+        const int& count_;
+
+        CountIsZero(const int& count) : count_(count) {}
+
+        bool operator()() const { return count_ == 0; }
+    };
+
+    typedef std::unique_lock<std::mutex> Lock;
     std::shared_ptr<InternalState> state_;
 };
 typedef std::shared_ptr<Latch> LatchPtr;
diff --git a/pulsar-client-cpp/lib/LogUtils.h b/pulsar-client-cpp/lib/LogUtils.h
index 81de443..e4d94ca 100644
--- a/pulsar-client-cpp/lib/LogUtils.h
+++ b/pulsar-client-cpp/lib/LogUtils.h
@@ -19,9 +19,9 @@
 
 #pragma once
 
-#include <boost/thread/tss.hpp>
 #include <string>
 #include <sstream>
+#include <memory>
 
 #include <pulsar/Logger.h>
 
@@ -31,7 +31,7 @@ namespace pulsar {
 
 #define DECLARE_LOG_OBJECT()                                                                     \
     static pulsar::Logger* logger() {                                                            \
-        static boost::thread_specific_ptr<pulsar::Logger> threadSpecificLogPtr;                  \
+        static thread_local std::unique_ptr<pulsar::Logger> threadSpecificLogPtr;                \
         pulsar::Logger* ptr = threadSpecificLogPtr.get();                                        \
         if (PULSAR_UNLIKELY(!ptr)) {                                                             \
             std::string logger = pulsar::LogUtils::getLoggerName(__FILE__);                      \
diff --git a/pulsar-client-cpp/lib/LookupDataResult.h b/pulsar-client-cpp/lib/LookupDataResult.h
index ef9b091..b48b854 100644
--- a/pulsar-client-cpp/lib/LookupDataResult.h
+++ b/pulsar-client-cpp/lib/LookupDataResult.h
@@ -22,6 +22,9 @@
 #include <lib/Future.h>
 #include <pulsar/Result.h>
 
+#include <iostream>
+#include <memory>
+
 namespace pulsar {
 class LookupDataResult;
 typedef std::shared_ptr<LookupDataResult> LookupDataResultPtr;
diff --git a/pulsar-client-cpp/lib/LookupService.h b/pulsar-client-cpp/lib/LookupService.h
index 5a899a4..2fdb51a 100644
--- a/pulsar-client-cpp/lib/LookupService.h
+++ b/pulsar-client-cpp/lib/LookupService.h
@@ -25,6 +25,8 @@
 #include <lib/LogUtils.h>
 #include <lib/TopicName.h>
 
+#include <vector>
+
 namespace pulsar {
 typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
 typedef Promise<Result, NamespaceTopicsPtr> NamespaceTopicsPromise;
diff --git a/pulsar-client-cpp/lib/MessageCrypto.h b/pulsar-client-cpp/lib/MessageCrypto.h
index 566313f..f44f37f 100644
--- a/pulsar-client-cpp/lib/MessageCrypto.h
+++ b/pulsar-client-cpp/lib/MessageCrypto.h
@@ -22,7 +22,7 @@
 #include <iostream>
 #include <map>
 #include <set>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #include <boost/scoped_array.hpp>
 
 #include <openssl/ssl.h>
@@ -96,8 +96,8 @@ class MessageCrypto {
                  const CryptoKeyReaderPtr keyReader, SharedBuffer& decryptedPayload);
 
    private:
-    typedef boost::unique_lock<boost::mutex> Lock;
-    boost::mutex mutex_;
+    typedef std::unique_lock<std::mutex> Lock;
+    std::mutex mutex_;
 
     int dataKeyLen_;
     boost::scoped_array<unsigned char> dataKey_;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 2b16f62..90aa9bf 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -498,7 +498,7 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
         return ResultInvalidConfiguration;
     }
 
-    if (messages_.pop(msg, milliseconds(timeout))) {
+    if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
         lock.unlock();
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
@@ -519,7 +519,7 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
-    if (messages_.pop(msg, milliseconds(0))) {
+    if (messages_.pop(msg, std::chrono::milliseconds(0))) {
         lock.unlock();
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         callback(ResultOk, msg);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 24ef7dd..74350b9 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -23,7 +23,7 @@
 #include "BlockingQueue.h"
 #include <vector>
 #include <queue>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 
 #include "ConsumerImplBase.h"
 #include "lib/UnAckedMessageTrackerDisabled.h"
@@ -90,8 +90,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     typedef std::map<std::string, ConsumerImplPtr> ConsumerMap;
     ConsumerMap consumers_;
     std::map<std::string, int> topicsPartitions_;
-    boost::mutex mutex_;
-    boost::mutex pendingReceiveMutex_;
+    std::mutex mutex_;
+    std::mutex pendingReceiveMutex_;
     MultiTopicsConsumerState state_;
     std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
     LookupServicePtr lookupServicePtr_;
diff --git a/pulsar-client-cpp/lib/ObjectPool.h b/pulsar-client-cpp/lib/ObjectPool.h
index 9fa6ca1..3cf39f2 100644
--- a/pulsar-client-cpp/lib/ObjectPool.h
+++ b/pulsar-client-cpp/lib/ObjectPool.h
@@ -20,10 +20,8 @@
 #define LIB_OBJECTPOOL_H_
 
 #include <algorithm>
-#include <boost/thread/locks.hpp>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #include <memory>
-#include <boost/thread/tss.hpp>
 
 namespace pulsar {
 
@@ -34,7 +32,7 @@ class Allocator {
     class Impl {
        public:
         // cheap lock to acquire
-        static boost::mutex mutex_;
+        static std::mutex mutex_;
 
         // note: use std::forward_list<> when switching to C++11 mode
         struct Node {
@@ -59,7 +57,7 @@ class Allocator {
         void* pop() {
             if (!head_) {
                 // size = 0
-                boost::lock_guard<boost::mutex> lock(mutex_);
+                std::lock_guard<std::mutex> lock(mutex_);
 
                 if (!globalPool_) {
                     return NULL;
@@ -86,7 +84,7 @@ class Allocator {
                 bool deleteList = true;
                 {
                     // Move the entries to global pool
-                    boost::lock_guard<boost::mutex> lock(mutex_);
+                    std::lock_guard<std::mutex> lock(mutex_);
 
                     // If total node count reached max allowed cache limit,
                     // skip adding to global pool.
@@ -148,7 +146,7 @@ class Allocator {
         }
     };
 
-    static boost::thread_specific_ptr<Impl> implPtr_;
+    static thread_local std::unique_ptr<Impl> implPtr_;
     typedef T value_type;
     typedef size_t size_type;
     typedef T* pointer;
@@ -192,10 +190,10 @@ class Allocator {
 // typename Allocator<Type,MaxSize>::Impl is important else the compiler
 // doesn't understand that it is a type
 template <typename Type, int MaxSize>
-boost::thread_specific_ptr<typename Allocator<Type, MaxSize>::Impl> Allocator<Type, MaxSize>::implPtr_;
+thread_local std::unique_ptr<typename Allocator<Type, MaxSize>::Impl> Allocator<Type, MaxSize>::implPtr_;
 
 template <typename Type, int MaxSize>
-boost::mutex Allocator<Type, MaxSize>::Impl::mutex_;
+std::mutex Allocator<Type, MaxSize>::Impl::mutex_;
 
 template <typename Type, int MaxSize>
 struct Allocator<Type, MaxSize>::Impl::GlobalPool* Allocator<Type, MaxSize>::Impl::globalPool_;
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 6ad847e..6edf06f 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -88,7 +88,7 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
         return ResultInvalidConfiguration;
     }
 
-    if (messages_.pop(msg, milliseconds(timeout))) {
+    if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
     } else {
@@ -108,7 +108,7 @@ void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
-    if (messages_.pop(msg, milliseconds(0))) {
+    if (messages_.pop(msg, std::chrono::milliseconds(0))) {
         lock.unlock();
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         callback(ResultOk, msg);
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index f4274b1..ec79529 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -23,7 +23,7 @@
 #include <vector>
 #include <queue>
 
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #include "ConsumerImplBase.h"
 #include "lib/UnAckedMessageTrackerDisabled.h"
 #include <lib/Latch.h>
@@ -80,8 +80,8 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     const ConsumerConfiguration conf_;
     typedef std::vector<ConsumerImplPtr> ConsumerList;
     ConsumerList consumers_;
-    boost::mutex mutex_;
-    boost::mutex pendingReceiveMutex_;
+    std::mutex mutex_;
+    std::mutex pendingReceiveMutex_;
     PartitionedConsumerState state_;
     unsigned int unsubscribedSoFar_;
     BlockingQueue<Message> messages_;
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index 11ca42e..72be5fe 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -20,7 +20,7 @@
 #include "ClientImpl.h"
 #include <vector>
 
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #include <pulsar/MessageRoutingPolicy.h>
 #include <pulsar/TopicMetadata.h>
 #include <lib/TopicName.h>
@@ -40,7 +40,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     };
     const static std::string PARTITION_NAME_SUFFIX;
 
-    typedef boost::unique_lock<boost::mutex> Lock;
+    typedef std::unique_lock<std::mutex> Lock;
 
     PartitionedProducerImpl(ClientImplPtr ptr, const TopicNamePtr topicName, const unsigned int numPartitions,
                             const ProducerConfiguration& config);
@@ -110,7 +110,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     MessageRoutingPolicyPtr routerPolicy_;
 
     // mutex_ is used to share state_, and numProducersCreated_
-    boost::mutex mutex_;
+    std::mutex mutex_;
 
     PartitionedProducerState state_;
 
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 0b95811..5e84007 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -19,7 +19,7 @@
 #ifndef LIB_PRODUCERIMPL_H_
 #define LIB_PRODUCERIMPL_H_
 
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #include <boost/date_time/posix_time/ptime.hpp>
 
 #include "ClientImpl.h"
@@ -133,7 +133,7 @@ class ProducerImpl : public HandlerBase,
     bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
                         SharedBuffer& encryptedPayload);
 
-    typedef boost::unique_lock<boost::mutex> Lock;
+    typedef std::unique_lock<std::mutex> Lock;
 
     ProducerConfiguration conf_;
 
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
index 7c6f23c..8965349 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
@@ -22,7 +22,7 @@
 #include <pulsar/MessageRoutingPolicy.h>
 #include <pulsar/ProducerConfiguration.h>
 #include <pulsar/TopicMetadata.h>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #include "Hash.h"
 #include "MessageRouterBase.h"
 
@@ -35,10 +35,10 @@ class RoundRobinMessageRouter : public MessageRouterBase {
     virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata);
 
    private:
-    boost::mutex mutex_;
+    std::mutex mutex_;
     unsigned int prevPartition_;
 };
-typedef boost::unique_lock<boost::mutex> Lock;
+typedef std::unique_lock<std::mutex> Lock;
 }  // namespace pulsar
 #pragma GCC visibility pop
 #endif  // PULSAR_RR_MESSAGE_ROUTER_HEADER_
diff --git a/pulsar-client-cpp/lib/TopicName.cc b/pulsar-client-cpp/lib/TopicName.cc
index 828c971..84bff73 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -34,10 +34,10 @@
 DECLARE_LOG_OBJECT()
 namespace pulsar {
 
-typedef boost::unique_lock<boost::mutex> Lock;
+typedef std::unique_lock<std::mutex> Lock;
 // static members
 CURL* TopicName::curl = NULL;
-boost::mutex TopicName::curlHandleMutex;
+std::mutex TopicName::curlHandleMutex;
 
 CURL* TopicName::getCurlHandle() {
     if (curl == NULL) {
diff --git a/pulsar-client-cpp/lib/TopicName.h b/pulsar-client-cpp/lib/TopicName.h
index ad58116..fc8dccb 100644
--- a/pulsar-client-cpp/lib/TopicName.h
+++ b/pulsar-client-cpp/lib/TopicName.h
@@ -24,7 +24,7 @@
 
 #include <string>
 #include <curl/curl.h>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 
 #pragma GCC visibility push(default)
 
@@ -59,7 +59,7 @@ class TopicName : public ServiceUnitId {
    private:
     static CURL* getCurlHandle();
     static CURL* curl;
-    static boost::mutex curlHandleMutex;
+    static std::mutex curlHandleMutex;
     static bool parse(const std::string& topicName, std::string& domain, std::string& property,
                       std::string& cluster, std::string& namespacePortion, std::string& localName);
     TopicName();
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index ba9fc97..248a60d 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -40,7 +40,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
 }
 
 void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
-    boost::unique_lock<boost::mutex> acquire(lock_);
+    std::lock_guard<std::mutex> acquire(lock_);
     LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
               << consumerReference_.getName().c_str());
     if (!oldSet_.empty()) {
@@ -62,28 +62,28 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, const
 }
 
 bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
-    boost::unique_lock<boost::mutex> acquire(lock_);
+    std::lock_guard<std::mutex> acquire(lock_);
     oldSet_.erase(m);
     return currentSet_.insert(m).second;
 }
 
 bool UnAckedMessageTrackerEnabled::isEmpty() {
-    boost::unique_lock<boost::mutex> acquire(lock_);
+    std::lock_guard<std::mutex> acquire(lock_);
     return oldSet_.empty() && currentSet_.empty();
 }
 
 bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
-    boost::unique_lock<boost::mutex> acquire(lock_);
+    std::lock_guard<std::mutex> acquire(lock_);
     return oldSet_.erase(m) || currentSet_.erase(m);
 }
 
 long UnAckedMessageTrackerEnabled::size() {
-    boost::unique_lock<boost::mutex> acquire(lock_);
+    std::lock_guard<std::mutex> acquire(lock_);
     return oldSet_.size() + currentSet_.size();
 }
 
 void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
-    boost::unique_lock<boost::mutex> acquire(lock_);
+    std::lock_guard<std::mutex> acquire(lock_);
     for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
         if (*it < msgId && it->partition() == msgId.partition()) {
             oldSet_.erase(it++);
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 7bea00d..ea92305 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -19,6 +19,9 @@
 #ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_
 #define LIB_UNACKEDMESSAGETRACKERENABLED_H_
 #include "lib/UnAckedMessageTrackerInterface.h"
+
+#include <mutex>
+
 namespace pulsar {
 
 class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
@@ -40,7 +43,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
     long size();
     std::set<MessageId> currentSet_;
     std::set<MessageId> oldSet_;
-    boost::mutex lock_;
+    std::mutex lock_;
     DeadlineTimerPtr timer_;
     ConsumerImplBase& consumerReference_;
     ClientImplPtr client_;
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
index e03b0a2..ea49912 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
@@ -24,15 +24,12 @@
 #include <algorithm>
 #include <utility>
 #include "pulsar/MessageId.h"
-#include <boost/thread/locks.hpp>
-#include <boost/thread/shared_mutex.hpp>
 #include "lib/ClientImpl.h"
 #include "lib/ConsumerImplBase.h"
 #include <boost/bind.hpp>
 #include <boost/asio.hpp>
 #include <lib/LogUtils.h>
 #include "lib/PulsarApi.pb.h"
-#include <boost/thread/recursive_mutex.hpp>
 #include <boost/asio/error.hpp>
 namespace pulsar {
 
diff --git a/pulsar-client-cpp/lib/UnboundedBlockingQueue.h b/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
index 69325f0..841be30 100644
--- a/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
+++ b/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
@@ -19,8 +19,8 @@
 #ifndef LIB_UNBOUNDEDBLOCKINGQUEUE_H_
 #define LIB_UNBOUNDEDBLOCKINGQUEUE_H_
 
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition.hpp>
+#include <mutex>
+#include <condition_variable>
 #include <boost/circular_buffer.hpp>
 // For struct QueueNotEmpty
 #include "BlockingQueue.h"
@@ -72,10 +72,10 @@ class UnboundedBlockingQueue {
         lock.unlock();
     }
 
-    bool pop(T& value, const boost::posix_time::time_duration& timeout) {
+    template <typename Duration>
+    bool pop(T& value, const Duration& timeout) {
         Lock lock(mutex_);
-        if (!queueEmptyCondition_.timed_wait(lock, timeout,
-                                             QueueNotEmpty<UnboundedBlockingQueue<T> >(*this))) {
+        if (!queueEmptyCondition_.wait_for(lock, timeout, QueueNotEmpty<UnboundedBlockingQueue<T> >(*this))) {
             return false;
         }
 
@@ -136,11 +136,11 @@ class UnboundedBlockingQueue {
    private:
     bool isEmptyNoMutex() const { return queue_.empty(); }
 
-    mutable boost::mutex mutex_;
-    boost::condition_variable queueEmptyCondition_;
+    mutable std::mutex mutex_;
+    std::condition_variable queueEmptyCondition_;
     Container queue_;
 
-    typedef boost::unique_lock<boost::mutex> Lock;
+    typedef std::unique_lock<std::mutex> Lock;
     friend struct QueueNotEmpty<UnboundedBlockingQueue<T> >;
 };
 
diff --git a/pulsar-client-cpp/lib/Utils.h b/pulsar-client-cpp/lib/Utils.h
index d119951..620cf60 100644
--- a/pulsar-client-cpp/lib/Utils.h
+++ b/pulsar-client-cpp/lib/Utils.h
@@ -23,6 +23,9 @@
 
 #include "Future.h"
 
+#include <map>
+#include <iostream>
+
 namespace pulsar {
 
 struct WaitForCallback {
diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
index 2212b5b..a4aa7aa 100644
--- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
+++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
@@ -38,6 +38,8 @@
 #include <boost/archive/iterators/base64_from_binary.hpp>
 #include <boost/archive/iterators/transform_width.hpp>
 
+#include <mutex>
+
 DECLARE_LOG_OBJECT()
 
 namespace pulsar {
@@ -254,14 +256,14 @@ static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void
     return size * nmemb;
 }
 
-static boost::mutex cacheMtx_;
+static std::mutex cacheMtx_;
 const std::string ZTSClient::getRoleToken() const {
     RoleToken roleToken;
     std::string cacheKey = "p=" + tenantDomain_ + "." + tenantService_ + ";d=" + providerDomain_;
 
     // locked block
     {
-        boost::lock_guard<boost::mutex> lock(cacheMtx_);
+        std::lock_guard<std::mutex> lock(cacheMtx_);
         roleToken = roleTokenCache_[cacheKey];
     }
 
@@ -328,7 +330,7 @@ const std::string ZTSClient::getRoleToken() const {
                 }
                 roleToken.token = root["token"].asString();
                 roleToken.expiryTime = root["expiryTime"].asUInt();
-                boost::lock_guard<boost::mutex> lock(cacheMtx_);
+                std::lock_guard<std::mutex> lock(cacheMtx_);
                 roleTokenCache_[cacheKey] = roleToken;
                 LOG_DEBUG("Got role token " << roleToken.token)
             } else {
diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.h b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.h
index 468ed02..d9b9abe 100644
--- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.h
+++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.h
@@ -19,7 +19,7 @@
 #include <string>
 #include <map>
 #include <lib/LogUtils.h>
-#include <boost/thread.hpp>
+
 #pragma GCC visibility push(default)
 
 namespace pulsar {
diff --git a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
index 646005d..4c8e8f6 100644
--- a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
+++ b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
@@ -39,7 +39,7 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
 
     std::string consumerStr_;
     DeadlineTimerPtr timer_;
-    boost::mutex mutex_;
+    std::mutex mutex_;
     unsigned int statsIntervalInSeconds_;
 
     friend std::ostream& operator<<(std::ostream&, const ConsumerStatsImpl&);
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
index 6929efd..2212809 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
@@ -36,7 +36,8 @@
 #include <boost/array.hpp>
 
 #include <boost/date_time/local_time/local_time.hpp>
-#include <boost/array.hpp>
+#include <memory>
+#include <mutex>
 #include <iostream>
 #include <vector>
 #include <lib/Utils.h>
@@ -49,7 +50,7 @@ typedef boost::accumulators::accumulator_set<
     boost::accumulators::stats<boost::accumulators::tag::mean, boost::accumulators::tag::extended_p_square> >
     LatencyAccumulator;
 
-class ProducerStatsImpl : public boost::enable_shared_from_this<ProducerStatsImpl>, public ProducerStatsBase {
+class ProducerStatsImpl : public std::enable_shared_from_this<ProducerStatsImpl>, public ProducerStatsBase {
    private:
     unsigned long numMsgsSent_;
     unsigned long numBytesSent_;
@@ -63,7 +64,7 @@ class ProducerStatsImpl : public boost::enable_shared_from_this<ProducerStatsImp
 
     std::string producerStr_;
     DeadlineTimerPtr timer_;
-    boost::mutex mutex_;
+    std::mutex mutex_;
     unsigned int statsIntervalInSeconds_;
 
     friend std::ostream& operator<<(std::ostream&, const ProducerStatsImpl&);
diff --git a/pulsar-client-cpp/perf/PerfConsumer.cc b/pulsar-client-cpp/perf/PerfConsumer.cc
index 06b168c..db316ad 100644
--- a/pulsar-client-cpp/perf/PerfConsumer.cc
+++ b/pulsar-client-cpp/perf/PerfConsumer.cc
@@ -23,10 +23,11 @@ DECLARE_LOG_OBJECT()
 #include <thread>
 #include <iostream>
 #include <fstream>
+#include <mutex>
 
 using namespace std::chrono;
 
-#include <boost/thread.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/bind.hpp>
 #include <boost/filesystem.hpp>
 #include <boost/program_options.hpp>
@@ -130,8 +131,8 @@ void handleAckComplete(Result) {
 }
 
 
-boost::mutex mutex;
-typedef boost::unique_lock<boost::mutex> Lock;
+std::mutex mutex;
+typedef std::unique_lock<std::mutex> Lock;
 typedef accumulator_set<uint64_t, stats<tag::mean, tag::p_square_quantile> > LatencyAccumulator;
 LatencyAccumulator e2eLatencyAccumulator(quantile_probability = 0.99);
 
diff --git a/pulsar-client-cpp/perf/PerfProducer.cc b/pulsar-client-cpp/perf/PerfProducer.cc
index 8f9704b..e36728b 100644
--- a/pulsar-client-cpp/perf/PerfProducer.cc
+++ b/pulsar-client-cpp/perf/PerfProducer.cc
@@ -19,7 +19,8 @@
 #include <lib/LogUtils.h>
 DECLARE_LOG_OBJECT()
 
-#include <boost/thread.hpp>
+#include <mutex>
+
 #include <boost/bind.hpp>
 #include <boost/filesystem.hpp>
 
@@ -29,9 +30,10 @@ DECLARE_LOG_OBJECT()
 #include <boost/accumulators/statistics/p_square_quantile.hpp>
 #include <boost/program_options/variables_map.hpp>
 #include <boost/program_options.hpp>
-#include <boost/thread.hpp>
+#include <thread>
 namespace po = boost::program_options;
 
+#include <atomic>
 #include <iostream>
 #include <fstream>
 #include <vector>
@@ -117,10 +119,10 @@ class EncKeyReader: public CryptoKeyReader {
 typedef accumulator_set<uint64_t, stats<tag::mean, tag::p_square_quantile> > LatencyAccumulator;
 LatencyAccumulator e2eLatencyAccumulator(quantile_probability = 0.99);
 std::vector<pulsar::Producer> producerList;
-std::vector<boost::thread> threadList;
+std::vector<std::thread> threadList;
 
-boost::mutex mutex;
-typedef boost::unique_lock<boost::mutex> Lock;
+std::mutex mutex;
+typedef std::unique_lock<std::mutex> Lock;
 
 typedef std::chrono::high_resolution_clock Clock;
 
@@ -136,29 +138,30 @@ void sendCallback(pulsar::Result result, const pulsar::Message& msg, Clock::time
 
 // Start a pulsar producer on a topic and keep producing messages
 void runProducer(const Arguments& args, std::string topicName, int threadIndex,
-                 RateLimiterPtr limiter, pulsar::Producer& producer) {
+                 RateLimiterPtr limiter, pulsar::Producer& producer,
+                 const std::atomic<bool>& exitCondition) {
     LOG_INFO("Producing messages for topic = " << topicName << ", threadIndex = " << threadIndex);
 
-    boost::scoped_array<char> payload(new char[args.msgSize]);
+    std::unique_ptr<char[]> payload(new char[args.msgSize]);
     memset(payload.get(), 0, args.msgSize);
     pulsar::MessageBuilder builder;
-    try {
-        while (true) {
-            if (args.rate != -1) {
-                limiter->acquire();
-            }
-            pulsar::Message msg = builder.create().setAllocatedContent(payload.get(), args.msgSize).build();
 
-            producer.sendAsync(msg, boost::bind(sendCallback, _1, _2, Clock::now()));
-            boost::this_thread::interruption_point();
+    while (true) {
+        if (args.rate != -1) {
+            limiter->acquire();
+        }
+        pulsar::Message msg = builder.create().setAllocatedContent(payload.get(), args.msgSize).build();
+
+        producer.sendAsync(msg, boost::bind(sendCallback, _1, _2, Clock::now()));
+        if (exitCondition) {
+            LOG_INFO("Thread interrupted. Exiting producer thread.");
+            break;
         }
-    } catch(const boost::thread_interrupted& e) {
-        // Thread interruption request received, break the loop
-        LOG_INFO("Thread interrupted. Exiting thread.");
     }
 }
 
-void startPerfProducer(const Arguments& args, pulsar::ProducerConfiguration &producerConf, pulsar::Client &client) {
+void startPerfProducer(const Arguments& args, pulsar::ProducerConfiguration &producerConf, pulsar::Client &client,
+        const std::atomic<bool>& exitCondition) {
     RateLimiterPtr limiter;
     if (args.rate != -1) {
         limiter = std::make_shared<pulsar::RateLimiter>(args.rate);
@@ -181,7 +184,9 @@ void startPerfProducer(const Arguments& args, pulsar::ProducerConfiguration &pro
             }
 
             for (int k = 0; k < args.numOfThreadsPerProducer; k++) {
-                threadList.push_back(boost::thread(boost::bind(runProducer, args, topic, k, limiter, producerList[i*args.numProducers + j])));
+                threadList.push_back(std::thread(
+                        boost::bind(runProducer, args, topic, k, limiter, producerList[i * args.numProducers + j],
+                                    std::cref(exitCondition))));
             }
         }
     }
@@ -364,7 +369,9 @@ int main(int argc, char** argv) {
     }
 
     pulsar::Client client(pulsar::PulsarFriend::getClient(args.serviceURL, conf, false));
-    startPerfProducer(args, producerConf, client);
+
+    std::atomic<bool> exitCondition;
+    startPerfProducer(args, producerConf, client, exitCondition);
 
     Clock::time_point oldTime = Clock::now();
     unsigned long totalMessagesProduced = 0;
@@ -392,9 +399,9 @@ int main(int argc, char** argv) {
         oldTime = now;
     }
     LOG_INFO("Total messagesProduced = " << totalMessagesProduced + messagesProduced);
-    for (int i = 0; i < threadList.size(); i++) {
-        threadList[i].interrupt();
-        threadList[i].join();
+    exitCondition = true;
+    for (auto& thread : threadList) {
+        thread.join();
     }
     // Waiting for the sendCallbacks To Complete
     usleep(2 * 1000 * 1000);
diff --git a/pulsar-client-cpp/perf/RateLimiter.h b/pulsar-client-cpp/perf/RateLimiter.h
index 5fa8f81..6812a4d 100644
--- a/pulsar-client-cpp/perf/RateLimiter.h
+++ b/pulsar-client-cpp/perf/RateLimiter.h
@@ -21,7 +21,7 @@
 
 #include <chrono>
 #include <thread>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 
 namespace pulsar {
 
@@ -42,8 +42,8 @@ class RateLimiter {
     long storedPermits_;
     double maxPermits_;
     Clock::time_point nextFree_;
-    boost::mutex mutex_;
-    typedef boost::unique_lock<boost::mutex> Lock;
+    std::mutex mutex_;
+    typedef std::unique_lock<std::mutex> Lock;
 };
 
 RateLimiter::RateLimiter(double rate)
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc b/pulsar-client-cpp/tests/AuthPluginTest.cc
index 75683a8..b2169e0 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -21,7 +21,7 @@
 #include <pulsar/Client.h>
 #include <boost/asio.hpp>
 #include <boost/algorithm/string.hpp>
-#include <boost/thread.hpp>
+#include <thread>
 #include <lib/LogUtils.h>
 
 #include "lib/Future.h"
@@ -196,7 +196,7 @@ void mockZTS(int port) {
 }  // namespace testAthenz
 
 TEST(AuthPluginTest, testAthenz) {
-    boost::thread zts(boost::bind(&testAthenz::mockZTS, 9999));
+    std::thread zts(std::bind(&testAthenz::mockZTS, 9999));
     pulsar::AuthenticationDataPtr data;
     std::string params = R"({
         "tenantDomain": "pulsar.test.tenant",
@@ -266,7 +266,7 @@ TEST(AuthPluginTest, testAuthFactoryTls) {
 }
 
 TEST(AuthPluginTest, testAuthFactoryAthenz) {
-    boost::thread zts(boost::bind(&testAthenz::mockZTS, 9998));
+    std::thread zts(std::bind(&testAthenz::mockZTS, 9998));
     pulsar::AuthenticationDataPtr data;
     std::string params = R"({
         "tenantDomain": "pulsar.test2.tenant",
diff --git a/pulsar-client-cpp/tests/AuthTokenTest.cc b/pulsar-client-cpp/tests/AuthTokenTest.cc
index dad9a0e..1b136b0 100644
--- a/pulsar-client-cpp/tests/AuthTokenTest.cc
+++ b/pulsar-client-cpp/tests/AuthTokenTest.cc
@@ -23,7 +23,6 @@
 #include <pulsar/Client.h>
 #include <boost/asio.hpp>
 #include <boost/algorithm/string.hpp>
-#include <boost/thread.hpp>
 #include <lib/LogUtils.h>
 
 #include <string>
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 17f01b4..4f04cb7 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -24,10 +24,8 @@
 #include <lib/Latch.h>
 #include <sstream>
 #include "boost/date_time/posix_time/posix_time.hpp"
-#include "boost/enable_shared_from_this.hpp"
 #include "CustomRoutingPolicy.h"
-#include <boost/thread.hpp>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
 #include <lib/TopicName.h>
 #include "PulsarFriend.h"
 #include "HttpHelper.h"
@@ -41,7 +39,7 @@ DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
 
-boost::mutex mutex_;
+std::mutex mutex_;
 static int globalTestBatchMessagesCounter = 0;
 static int globalCount = 0;
 static long globalResendMessageCount = 0;
@@ -60,7 +58,7 @@ static void messageListenerFunctionWithoutAck(Consumer consumer, const Message&
 }
 
 static void sendCallBack(Result r, const Message& msg, std::string prefix, int* count) {
-    static boost::mutex sendMutex_;
+    static std::mutex sendMutex_;
     sendMutex_.lock();
     ASSERT_EQ(r, ResultOk);
     std::string messageContent = prefix + std::to_string(*count);
@@ -72,7 +70,7 @@ static void sendCallBack(Result r, const Message& msg, std::string prefix, int*
 
 static void receiveCallBack(Result r, const Message& msg, std::string& messageContent, bool checkContent,
                             bool* isFailed, int* count) {
-    static boost::mutex receiveMutex_;
+    static std::mutex receiveMutex_;
     receiveMutex_.lock();
 
     if (r == ResultOk) {
@@ -1525,7 +1523,7 @@ TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {
     result = producer.send(msg);
     ASSERT_EQ(ResultOk, result);
 
-    ASSERT_TRUE(latch.wait(milliseconds(30 * 1000)));
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(30)));
     ASSERT_GE(globalCount, 2);
 
     consumer.unsubscribe();
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc
index de19ed9..3ebcabb 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -25,7 +25,6 @@
 #include <sstream>
 #include "boost/date_time/posix_time/posix_time.hpp"
 #include "CustomRoutingPolicy.h"
-#include <boost/thread.hpp>
 #include "lib/Future.h"
 #include "lib/Utils.h"
 #include <ctime>
diff --git a/pulsar-client-cpp/tests/BlockingQueueTest.cc b/pulsar-client-cpp/tests/BlockingQueueTest.cc
index 7073b40..4047e5e 100644
--- a/pulsar-client-cpp/tests/BlockingQueueTest.cc
+++ b/pulsar-client-cpp/tests/BlockingQueueTest.cc
@@ -19,20 +19,17 @@
 #include <gtest/gtest.h>
 #include <lib/BlockingQueue.h>
 
-#include <boost/thread.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
-
-using namespace boost::posix_time;
+#include <thread>
 
 class ProducerWorker {
    private:
-    boost::thread producerThread_;
+    std::thread producerThread_;
     BlockingQueue<int>& queue_;
 
    public:
     ProducerWorker(BlockingQueue<int>& queue) : queue_(queue) {}
 
-    void produce(int number) { producerThread_ = boost::thread(&ProducerWorker::pushNumbers, this, number); }
+    void produce(int number) { producerThread_ = std::thread(&ProducerWorker::pushNumbers, this, number); }
 
     void pushNumbers(int number) {
         for (int i = 1; i <= number; i++) {
@@ -45,13 +42,13 @@ class ProducerWorker {
 
 class ConsumerWorker {
    private:
-    boost::thread consumerThread_;
+    std::thread consumerThread_;
     BlockingQueue<int>& queue_;
 
    public:
     ConsumerWorker(BlockingQueue<int>& queue) : queue_(queue) {}
 
-    void consume(int number) { consumerThread_ = boost::thread(&ConsumerWorker::popNumbers, this, number); }
+    void consume(int number) { consumerThread_ = std::thread(&ConsumerWorker::popNumbers, this, number); }
 
     void popNumbers(int number) {
         for (int i = 1; i <= number; i++) {
@@ -148,8 +145,8 @@ TEST(BlockingQueueTest, testTimeout) {
     size_t size = 5;
     BlockingQueue<int> queue(size);
     int value;
-    bool popReturn = queue.pop(value, seconds(1));
-    boost::this_thread::sleep(seconds(2));
+    bool popReturn = queue.pop(value, std::chrono::seconds(1));
+    std::this_thread::sleep_for(std::chrono::seconds(2));
     ASSERT_FALSE(popReturn);
 }
 
diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
index be24863..133f94c 100644
--- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
@@ -20,9 +20,7 @@
 #include <pulsar/Client.h>
 #include <lib/LogUtils.h>
 #include <lib/Commands.h>
-#include "boost/date_time/posix_time/posix_time.hpp"
 #include "CustomRoutingPolicy.h"
-#include <boost/thread.hpp>
 #include "lib/Future.h"
 #include "lib/Utils.h"
 #include "PulsarFriend.h"
@@ -306,5 +304,5 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {
     consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 10, latch, 3));
 
     // Wait for ten seconds only
-    ASSERT_TRUE(latch.wait(milliseconds(10 * 1000)));
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
 }
diff --git a/pulsar-client-cpp/tests/LatchTest.cc b/pulsar-client-cpp/tests/LatchTest.cc
index 38e19ee..9f332fb 100644
--- a/pulsar-client-cpp/tests/LatchTest.cc
+++ b/pulsar-client-cpp/tests/LatchTest.cc
@@ -18,30 +18,28 @@
  */
 #include <gtest/gtest.h>
 #include <lib/Latch.h>
-#include <boost/thread.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <thread>
 #include "LogUtils.h"
 
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
-using namespace boost::posix_time;
 
 class Service {
    private:
     std::string serviceName_;
-    time_duration sleepDuration_;
+    std::chrono::milliseconds sleepDuration_;
     Latch latch_;
-    boost::thread thread_;
+    std::thread thread_;
 
    public:
-    Service(const std::string& serviceName, time_duration sleepDuration, const Latch& latch)
+    Service(const std::string& serviceName, std::chrono::milliseconds sleepDuration, const Latch& latch)
         : serviceName_(serviceName), sleepDuration_(sleepDuration), latch_(latch) {
-        thread_ = boost::thread(&Service::run, this);
+        thread_ = std::thread(&Service::run, this);
     }
 
     void run() {
-        boost::this_thread::sleep(sleepDuration_);
+        std::this_thread::sleep_for(sleepDuration_);
         LOG_INFO("Service " << serviceName_ << " is up");
         latch_.countdown();
     }
@@ -51,17 +49,17 @@ class Service {
 
 TEST(LatchTest, testCountDown) {
     Latch latch(3);
-    Service service1("service1", millisec(50), latch);
-    Service service2("service2", millisec(30), latch);
-    Service service3("service3", millisec(20), latch);
+    Service service1("service1", std::chrono::milliseconds(50), latch);
+    Service service2("service2", std::chrono::milliseconds(30), latch);
+    Service service3("service3", std::chrono::milliseconds(20), latch);
     latch.wait();
 }
 
 TEST(LatchTest, testLatchCount) {
     Latch latch(3);
-    Service service1("service1", millisec(50), latch);
-    Service service2("service2", millisec(30), latch);
-    Service service3("service3", millisec(20), latch);
+    Service service1("service1", std::chrono::milliseconds(50), latch);
+    Service service2("service2", std::chrono::milliseconds(30), latch);
+    Service service3("service3", std::chrono::milliseconds(20), latch);
     ASSERT_EQ(3, latch.getCount());
     latch.wait();
     ASSERT_EQ(0, latch.getCount());
@@ -70,17 +68,17 @@ TEST(LatchTest, testLatchCount) {
 TEST(LatchTest, testTimedWait) {
     // Wait for 7 seconds which is more than the maximum sleep time (5 seconds)
     Latch latch1(3);
-    Service service1("service1", millisec(50), latch1);
-    Service service2("service2", millisec(30), latch1);
-    Service service3("service3", millisec(50), latch1);
-    ASSERT_TRUE(latch1.wait(millisec(70)));
+    Service service1("service1", std::chrono::milliseconds(50), latch1);
+    Service service2("service2", std::chrono::milliseconds(30), latch1);
+    Service service3("service3", std::chrono::milliseconds(50), latch1);
+    ASSERT_TRUE(latch1.wait(std::chrono::milliseconds(70)));
 
     // Wait for 3 seconds which is less than the maximum sleep time (5 seconds)
     Latch latch2(3);
-    Service service4("service4", millisec(50), latch2);
-    Service service5("service5", millisec(30), latch2);
-    Service service6("service6", millisec(50), latch2);
-    ASSERT_FALSE(latch2.wait(millisec(30)));
+    Service service4("service4", std::chrono::milliseconds(50), latch2);
+    Service service5("service5", std::chrono::milliseconds(30), latch2);
+    Service service6("service6", std::chrono::milliseconds(50), latch2);
+    ASSERT_FALSE(latch2.wait(std::chrono::milliseconds(30)));
 
     // After the assert is passed and Service is destroyed because of join, the
     // main thread would not exit until service4 thread is returned.
diff --git a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
index 890cee6..3b22a30 100644
--- a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
+++ b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
@@ -105,7 +105,7 @@ TEST(ZeroQueueSizeTest, testMessageListener) {
         ASSERT_EQ(ResultOk, result);
     }
 
-    ASSERT_TRUE(latch.wait(milliseconds(30 * 1000)));
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(30)));
     ASSERT_EQ(globalCount, totalMessages);
 
     consumer.unsubscribe();