You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/30 21:08:52 UTC
[pulsar] branch master updated: Replace boost::mutex and
boost::thread with std::mutex and std::thread (#3461)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1d14680 Replace boost::mutex and boost::thread with std::mutex and std::thread (#3461)
1d14680 is described below
commit 1d14680c8bcdae4159afc230b12c22545f855a58
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jan 30 13:08:47 2019 -0800
Replace boost::mutex and boost::thread with std::mutex and std::thread (#3461)
* Replace boost::mutex and boost::thread with std::mutex and std::thread
* Added missing mutex header
* Added missing vector header
* Removed extra tab
* Added missing <atomic> include
* Missing <memory> includes
---
pulsar-client-cpp/CMakeLists.txt | 2 +-
pulsar-client-cpp/lib/Authentication.cc | 14 +++---
.../lib/BatchAcknowledgementTracker.h | 6 +--
pulsar-client-cpp/lib/BinaryProtoLookupService.h | 4 +-
pulsar-client-cpp/lib/BlockingQueue.h | 17 +++----
pulsar-client-cpp/lib/ClientConnection.h | 6 +--
pulsar-client-cpp/lib/ClientImpl.cc | 5 +-
pulsar-client-cpp/lib/ClientImpl.h | 4 +-
pulsar-client-cpp/lib/Commands.cc | 17 +++----
pulsar-client-cpp/lib/CompressionCodecZLib.h | 1 -
pulsar-client-cpp/lib/ConnectionPool.cc | 2 +-
pulsar-client-cpp/lib/ConnectionPool.h | 4 +-
pulsar-client-cpp/lib/ConsumerImpl.cc | 6 +--
pulsar-client-cpp/lib/ConsumerImpl.h | 4 +-
pulsar-client-cpp/lib/ExecutorService.h | 6 +--
pulsar-client-cpp/lib/Future.h | 11 +++--
pulsar-client-cpp/lib/HandlerBase.h | 6 +--
pulsar-client-cpp/lib/Latch.cc | 13 -----
pulsar-client-cpp/lib/Latch.h | 24 +++++++---
pulsar-client-cpp/lib/LogUtils.h | 4 +-
pulsar-client-cpp/lib/LookupDataResult.h | 3 ++
pulsar-client-cpp/lib/LookupService.h | 2 +
pulsar-client-cpp/lib/MessageCrypto.h | 6 +--
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 4 +-
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 6 +--
pulsar-client-cpp/lib/ObjectPool.h | 16 +++----
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 4 +-
pulsar-client-cpp/lib/PartitionedConsumerImpl.h | 6 +--
pulsar-client-cpp/lib/PartitionedProducerImpl.h | 6 +--
pulsar-client-cpp/lib/ProducerImpl.h | 4 +-
pulsar-client-cpp/lib/RoundRobinMessageRouter.h | 6 +--
pulsar-client-cpp/lib/TopicName.cc | 4 +-
pulsar-client-cpp/lib/TopicName.h | 4 +-
.../lib/UnAckedMessageTrackerEnabled.cc | 12 ++---
.../lib/UnAckedMessageTrackerEnabled.h | 5 +-
.../lib/UnAckedMessageTrackerInterface.h | 3 --
pulsar-client-cpp/lib/UnboundedBlockingQueue.h | 16 +++----
pulsar-client-cpp/lib/Utils.h | 3 ++
pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc | 8 ++--
pulsar-client-cpp/lib/auth/athenz/ZTSClient.h | 2 +-
pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h | 2 +-
pulsar-client-cpp/lib/stats/ProducerStatsImpl.h | 7 +--
pulsar-client-cpp/perf/PerfConsumer.cc | 7 +--
pulsar-client-cpp/perf/PerfProducer.cc | 55 ++++++++++++----------
pulsar-client-cpp/perf/RateLimiter.h | 6 +--
pulsar-client-cpp/tests/AuthPluginTest.cc | 6 +--
pulsar-client-cpp/tests/AuthTokenTest.cc | 1 -
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 12 ++---
pulsar-client-cpp/tests/BatchMessageTest.cc | 1 -
pulsar-client-cpp/tests/BlockingQueueTest.cc | 17 +++----
pulsar-client-cpp/tests/ConsumerStatsTest.cc | 4 +-
pulsar-client-cpp/tests/LatchTest.cc | 42 ++++++++---------
pulsar-client-cpp/tests/ZeroQueueSizeTest.cc | 2 +-
53 files changed, 221 insertions(+), 217 deletions(-)
diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 57aaf27..f122c79 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -106,7 +106,7 @@ else()
endif (USE_LOG4CXX)
endif (LINK_STATIC)
-find_package(Boost REQUIRED COMPONENTS program_options filesystem regex thread system)
+find_package(Boost REQUIRED COMPONENTS program_options filesystem regex system)
if (BUILD_PYTHON_WRAPPER)
find_package(PythonLibs REQUIRED)
diff --git a/pulsar-client-cpp/lib/Authentication.cc b/pulsar-client-cpp/lib/Authentication.cc
index 3b9c45f..f552b53 100644
--- a/pulsar-client-cpp/lib/Authentication.cc
+++ b/pulsar-client-cpp/lib/Authentication.cc
@@ -29,7 +29,7 @@
#include <iostream>
#include <dlfcn.h>
#include <cstdlib>
-#include <boost/thread.hpp>
+#include <mutex>
#include <boost/algorithm/string.hpp>
DECLARE_LOG_OBJECT()
@@ -103,12 +103,12 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP
return AuthFactory::create(pluginNameOrDynamicLibPath, params);
}
-boost::mutex mutex;
+std::mutex mutex;
std::vector<void*> AuthFactory::loadedLibrariesHandles_;
bool AuthFactory::isShutdownHookRegistered_ = false;
void AuthFactory::release_handles() {
- boost::lock_guard<boost::mutex> lock(mutex);
+ std::lock_guard<std::mutex> lock(mutex);
for (std::vector<void*>::iterator ite = AuthFactory::loadedLibrariesHandles_.begin();
ite != AuthFactory::loadedLibrariesHandles_.end(); ite++) {
dlclose(*ite);
@@ -147,7 +147,7 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const std:
AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibPath,
const std::string& authParamsString) {
{
- boost::lock_guard<boost::mutex> lock(mutex);
+ std::lock_guard<std::mutex> lock(mutex);
if (!AuthFactory::isShutdownHookRegistered_) {
atexit(release_handles);
AuthFactory::isShutdownHookRegistered_ = true;
@@ -163,7 +163,7 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP
void* handle = dlopen(pluginNameOrDynamicLibPath.c_str(), RTLD_LAZY);
if (handle != NULL) {
{
- boost::lock_guard<boost::mutex> lock(mutex);
+ std::lock_guard<std::mutex> lock(mutex);
loadedLibrariesHandles_.push_back(handle);
}
Authentication* (*createAuthentication)(const std::string&);
@@ -183,7 +183,7 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP
AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibPath, ParamMap& params) {
{
- boost::lock_guard<boost::mutex> lock(mutex);
+ std::lock_guard<std::mutex> lock(mutex);
if (!AuthFactory::isShutdownHookRegistered_) {
atexit(release_handles);
AuthFactory::isShutdownHookRegistered_ = true;
@@ -198,7 +198,7 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP
Authentication* auth = NULL;
void* handle = dlopen(pluginNameOrDynamicLibPath.c_str(), RTLD_LAZY);
if (handle != NULL) {
- boost::lock_guard<boost::mutex> lock(mutex);
+ std::lock_guard<std::mutex> lock(mutex);
loadedLibrariesHandles_.push_back(handle);
Authentication* (*createAuthentication)(ParamMap&);
*(void**)(&createAuthentication) = dlsym(handle, "createFromMap");
diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
index 6b20c00..6a709b3 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
@@ -21,7 +21,7 @@
#include "MessageImpl.h"
#include <map>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include <boost/dynamic_bitset.hpp>
#include <lib/PulsarApi.pb.h>
#include <algorithm>
@@ -34,10 +34,10 @@ class ConsumerImpl;
class BatchAcknowledgementTracker {
private:
- typedef boost::unique_lock<boost::mutex> Lock;
+ typedef std::unique_lock<std::mutex> Lock;
typedef std::pair<MessageId, boost::dynamic_bitset<> > TrackerPair;
typedef std::map<MessageId, boost::dynamic_bitset<> > TrackerMap;
- boost::mutex mutex_;
+ std::mutex mutex_;
TrackerMap trackerMap_;
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
index fe193f4..7a9398a 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
@@ -24,6 +24,8 @@
#include "ConnectionPool.h"
#include "Backoff.h"
#include <lib/LookupService.h>
+#include <mutex>
+
#pragma GCC visibility push(default)
namespace pulsar {
@@ -43,7 +45,7 @@ class BinaryProtoLookupService : public LookupService {
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName);
private:
- boost::mutex mutex_;
+ std::mutex mutex_;
uint64_t requestIdGenerator_;
std::string serviceUrl_;
diff --git a/pulsar-client-cpp/lib/BlockingQueue.h b/pulsar-client-cpp/lib/BlockingQueue.h
index 44bfac6..39f8511 100644
--- a/pulsar-client-cpp/lib/BlockingQueue.h
+++ b/pulsar-client-cpp/lib/BlockingQueue.h
@@ -19,8 +19,8 @@
#ifndef LIB_BLOCKINGQUEUE_H_
#define LIB_BLOCKINGQUEUE_H_
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition.hpp>
+#include <mutex>
+#include <condition_variable>
#include <boost/circular_buffer.hpp>
/**
@@ -199,9 +199,10 @@ class BlockingQueue {
}
}
- bool pop(T& value, const boost::posix_time::time_duration& timeout) {
+ template <typename Duration>
+ bool pop(T& value, const Duration& timeout) {
Lock lock(mutex_);
- if (!queueEmptyCondition.timed_wait(lock, timeout, QueueNotEmpty<BlockingQueue<T> >(*this))) {
+ if (!queueEmptyCondition.wait_for(lock, timeout, QueueNotEmpty<BlockingQueue<T> >(*this))) {
return false;
}
@@ -279,13 +280,13 @@ class BlockingQueue {
bool isFullNoMutex() const { return (queue_.size() + reservedSpots_) == maxSize_; }
const size_t maxSize_;
- mutable boost::mutex mutex_;
- boost::condition_variable queueFullCondition;
- boost::condition_variable queueEmptyCondition;
+ mutable std::mutex mutex_;
+ std::condition_variable queueFullCondition;
+ std::condition_variable queueEmptyCondition;
Container queue_;
int reservedSpots_;
- typedef boost::unique_lock<boost::mutex> Lock;
+ typedef std::unique_lock<std::mutex> Lock;
friend class QueueReservedSpot;
friend struct QueueNotEmpty<BlockingQueue<T> >;
friend struct QueueNotFull<BlockingQueue<T> >;
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index e1cb734..38e3803 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -24,7 +24,7 @@
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/any.hpp>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include <functional>
#include <string>
#include <vector>
@@ -288,8 +288,8 @@ class ClientConnection : public std::enable_shared_from_this<ClientConnection> {
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
- boost::mutex mutex_;
- typedef boost::unique_lock<boost::mutex> Lock;
+ std::mutex mutex_;
+ typedef std::unique_lock<std::mutex> Lock;
// Pending buffers to write on the socket
std::deque<boost::any> pendingWriteBuffers_;
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 8fa9120..f3b193d 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -27,16 +27,15 @@
#include "MultiTopicsConsumerImpl.h"
#include "PatternMultiTopicsConsumerImpl.h"
#include "SimpleLoggerImpl.h"
-#include "Log4CxxLogger.h"
#include <boost/bind.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <sstream>
#include <openssl/sha.h>
-#include "boost/date_time/posix_time/posix_time.hpp"
#include <lib/HTTPLookupService.h>
#include <lib/TopicName.h>
#include <algorithm>
#include <regex>
+#include <mutex>
DECLARE_LOG_OBJECT()
@@ -62,7 +61,7 @@ const std::string generateRandomName() {
return hexHash.str();
}
-typedef boost::unique_lock<boost::mutex> Lock;
+typedef std::unique_lock<std::mutex> Lock;
typedef std::vector<std::string> StringList;
diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h
index 7d3b879..46b7c2b 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -24,7 +24,7 @@
#include "BinaryProtoLookupService.h"
#include "ConnectionPool.h"
#include "LookupDataResult.h"
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include <lib/TopicName.h>
#include "ProducerImplBase.h"
#include "ConsumerImplBase.h"
@@ -125,7 +125,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
Closed
};
- boost::mutex mutex_;
+ std::mutex mutex_;
State state_;
std::string serviceUrl_;
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 4c80154..384e54c 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -28,7 +28,7 @@
#include <pulsar/Schema.h>
#include "checksum/ChecksumProvider.h"
#include <algorithm>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
using namespace pulsar;
namespace pulsar {
@@ -98,22 +98,21 @@ SharedBuffer Commands::writeMessageWithSize(const BaseCommand& cmd) {
SharedBuffer Commands::newPartitionMetadataRequest(const std::string& topic, uint64_t requestId) {
static BaseCommand cmd;
- static boost::mutex mutex;
- mutex.lock();
+ static std::mutex mutex;
+ std::lock_guard<std::mutex> lock(mutex);
cmd.set_type(BaseCommand::PARTITIONED_METADATA);
CommandPartitionedTopicMetadata* partitionMetadata = cmd.mutable_partitionmetadata();
partitionMetadata->set_topic(topic);
partitionMetadata->set_request_id(requestId);
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_partitionmetadata();
- mutex.unlock();
return buffer;
}
SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritative, uint64_t requestId) {
static BaseCommand cmd;
- static boost::mutex mutex;
- mutex.lock();
+ static std::mutex mutex;
+ std::lock_guard<std::mutex> lock(mutex);
cmd.set_type(BaseCommand::LOOKUP);
CommandLookupTopic* lookup = cmd.mutable_lookuptopic();
lookup->set_topic(topic);
@@ -121,21 +120,19 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat
lookup->set_request_id(requestId);
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_lookuptopic();
- mutex.unlock();
return buffer;
}
SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) {
static BaseCommand cmd;
- static boost::mutex mutex;
- mutex.lock();
+ static std::mutex mutex;
+ std::lock_guard<std::mutex> lock(mutex);
cmd.set_type(BaseCommand::CONSUMER_STATS);
CommandConsumerStats* consumerStats = cmd.mutable_consumerstats();
consumerStats->set_consumer_id(consumerId);
consumerStats->set_request_id(requestId);
const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_consumerstats();
- mutex.unlock();
return buffer;
}
diff --git a/pulsar-client-cpp/lib/CompressionCodecZLib.h b/pulsar-client-cpp/lib/CompressionCodecZLib.h
index 08fd0c9..41234ac 100644
--- a/pulsar-client-cpp/lib/CompressionCodecZLib.h
+++ b/pulsar-client-cpp/lib/CompressionCodecZLib.h
@@ -21,7 +21,6 @@
#include "CompressionCodec.h"
#include <zlib.h>
-#include <boost/thread/mutex.hpp>
// Make symbol visible to unit tests
#pragma GCC visibility push(default)
diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc
index 36ce4a5..993731e 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.cc
+++ b/pulsar-client-cpp/lib/ConnectionPool.cc
@@ -35,7 +35,7 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
const std::string& logicalAddress, const std::string& physicalAddress) {
- boost::unique_lock<boost::mutex> lock(mutex_);
+ std::unique_lock<std::mutex> lock(mutex_);
if (poolConnections_) {
PoolMap::iterator cnxIt = pool_.find(logicalAddress);
diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h
index 471454b..50441a9 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.h
+++ b/pulsar-client-cpp/lib/ConnectionPool.h
@@ -25,7 +25,7 @@
#include <string>
#include <map>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#pragma GCC visibility push(default)
namespace pulsar {
@@ -62,7 +62,7 @@ class ConnectionPool {
typedef std::map<std::string, ClientConnectionWeakPtr> PoolMap;
PoolMap pool_;
bool poolConnections_;
- boost::mutex mutex_;
+ std::mutex mutex_;
friend class ConnectionPoolTest;
};
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 11dec26..a2cabf8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -497,7 +497,7 @@ void ConsumerImpl::internalListener() {
}
lock.unlock();
Message msg;
- if (!incomingMessages_.pop(msg, boost::posix_time::milliseconds(0))) {
+ if (!incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
// This will only happen when the connection got reset and we cleared the queue
return;
}
@@ -572,7 +572,7 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
stateLock.unlock();
Lock lock(pendingReceiveMutex_);
- if (incomingMessages_.pop(msg, milliseconds(0))) {
+ if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
lock.unlock();
messageProcessed(msg);
unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -637,7 +637,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
return ResultInvalidConfiguration;
}
- if (incomingMessages_.pop(msg, milliseconds(timeout))) {
+ if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
messageProcessed(msg);
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 7492902..01aa55f 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -142,7 +142,7 @@ class ConsumerImpl : public ConsumerImplBase,
Optional<MessageId> clearReceiveQueue();
- boost::mutex mutexForReceiveWithZeroQueueSize;
+ std::mutex mutexForReceiveWithZeroQueueSize;
const ConsumerConfiguration config_;
const std::string subscription_;
std::string originalSubscriptionName_;
@@ -163,7 +163,7 @@ class ConsumerImpl : public ConsumerImplBase,
int32_t partitionIndex_;
Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
bool messageListenerRunning_;
- boost::mutex messageListenerMutex_;
+ std::mutex messageListenerMutex_;
CompressionCodecProvider compressionCodecProvider_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h
index 2faceca..545abe5 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -24,7 +24,7 @@
#include <boost/asio/ssl.hpp>
#include <functional>
#include <boost/noncopyable.hpp>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#pragma GCC visibility push(default)
@@ -87,8 +87,8 @@ class ExecutorServiceProvider {
typedef std::vector<ExecutorServicePtr> ExecutorList;
ExecutorList executors_;
int executorIdx_;
- boost::mutex mutex_;
- typedef boost::unique_lock<boost::mutex> Lock;
+ std::mutex mutex_;
+ typedef std::unique_lock<std::mutex> Lock;
};
typedef std::shared_ptr<ExecutorServiceProvider> ExecutorServiceProviderPtr;
diff --git a/pulsar-client-cpp/lib/Future.h b/pulsar-client-cpp/lib/Future.h
index c0e4c6e..d604645 100644
--- a/pulsar-client-cpp/lib/Future.h
+++ b/pulsar-client-cpp/lib/Future.h
@@ -20,21 +20,22 @@
#define LIB_FUTURE_H_
#include <functional>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
+#include <mutex>
+#include <memory>
+#include <condition_variable>
#include <list>
#pragma GCC visibility push(default)
-typedef boost::unique_lock<boost::mutex> Lock;
+typedef std::unique_lock<std::mutex> Lock;
namespace pulsar {
template <typename Result, typename Type>
struct InternalState {
- boost::mutex mutex;
- boost::condition_variable condition;
+ std::mutex mutex;
+ std::condition_variable condition;
Result result;
Type value;
bool complete;
diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h
index bcc824e..56f5ef3 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -90,12 +90,12 @@ class HandlerBase {
ClientImplWeakPtr client_;
const std::string topic_;
ClientConnectionWeakPtr connection_;
- boost::mutex mutex_;
- boost::mutex pendingReceiveMutex_;
+ std::mutex mutex_;
+ std::mutex pendingReceiveMutex_;
ptime creationTimestamp_;
const TimeDuration operationTimeut_;
- typedef boost::unique_lock<boost::mutex> Lock;
+ typedef std::unique_lock<std::mutex> Lock;
enum State
{
diff --git a/pulsar-client-cpp/lib/Latch.cc b/pulsar-client-cpp/lib/Latch.cc
index 3247940..c9cfab8 100644
--- a/pulsar-client-cpp/lib/Latch.cc
+++ b/pulsar-client-cpp/lib/Latch.cc
@@ -20,14 +20,6 @@
namespace pulsar {
-struct CountIsZero {
- const int& count_;
-
- CountIsZero(const int& count) : count_(count) {}
-
- bool operator()() const { return count_ == 0; }
-};
-
Latch::Latch(int count) : state_(std::make_shared<InternalState>()) { state_->count = count; }
void Latch::countdown() {
@@ -52,9 +44,4 @@ void Latch::wait() {
state_->condition.wait(lock, CountIsZero(state_->count));
}
-bool Latch::wait(const boost::posix_time::time_duration& timeout) {
- Lock lock(state_->mutex);
- return state_->condition.timed_wait(lock, timeout, CountIsZero(state_->count));
-}
-
} /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/Latch.h b/pulsar-client-cpp/lib/Latch.h
index 360554c..f6e7dea 100644
--- a/pulsar-client-cpp/lib/Latch.h
+++ b/pulsar-client-cpp/lib/Latch.h
@@ -20,8 +20,8 @@
#define LIB_LATCH_H_
#include <memory>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
+#include <mutex>
+#include <condition_variable>
#pragma GCC visibility push(default)
@@ -35,18 +35,30 @@ class Latch {
void wait();
- bool wait(const boost::posix_time::time_duration& timeout);
+ template <typename Duration>
+ bool wait(const Duration& timeout) {
+ Lock lock(state_->mutex);
+ return state_->condition.wait_for(lock, timeout, CountIsZero(state_->count));
+ }
int getCount();
private:
struct InternalState {
- boost::mutex mutex;
- boost::condition_variable condition;
+ std::mutex mutex;
+ std::condition_variable condition;
int count;
};
- typedef boost::unique_lock<boost::mutex> Lock;
+ struct CountIsZero {
+ const int& count_;
+
+ CountIsZero(const int& count) : count_(count) {}
+
+ bool operator()() const { return count_ == 0; }
+ };
+
+ typedef std::unique_lock<std::mutex> Lock;
std::shared_ptr<InternalState> state_;
};
typedef std::shared_ptr<Latch> LatchPtr;
diff --git a/pulsar-client-cpp/lib/LogUtils.h b/pulsar-client-cpp/lib/LogUtils.h
index 81de443..e4d94ca 100644
--- a/pulsar-client-cpp/lib/LogUtils.h
+++ b/pulsar-client-cpp/lib/LogUtils.h
@@ -19,9 +19,9 @@
#pragma once
-#include <boost/thread/tss.hpp>
#include <string>
#include <sstream>
+#include <memory>
#include <pulsar/Logger.h>
@@ -31,7 +31,7 @@ namespace pulsar {
#define DECLARE_LOG_OBJECT() \
static pulsar::Logger* logger() { \
- static boost::thread_specific_ptr<pulsar::Logger> threadSpecificLogPtr; \
+ static thread_local std::unique_ptr<pulsar::Logger> threadSpecificLogPtr; \
pulsar::Logger* ptr = threadSpecificLogPtr.get(); \
if (PULSAR_UNLIKELY(!ptr)) { \
std::string logger = pulsar::LogUtils::getLoggerName(__FILE__); \
diff --git a/pulsar-client-cpp/lib/LookupDataResult.h b/pulsar-client-cpp/lib/LookupDataResult.h
index ef9b091..b48b854 100644
--- a/pulsar-client-cpp/lib/LookupDataResult.h
+++ b/pulsar-client-cpp/lib/LookupDataResult.h
@@ -22,6 +22,9 @@
#include <lib/Future.h>
#include <pulsar/Result.h>
+#include <iostream>
+#include <memory>
+
namespace pulsar {
class LookupDataResult;
typedef std::shared_ptr<LookupDataResult> LookupDataResultPtr;
diff --git a/pulsar-client-cpp/lib/LookupService.h b/pulsar-client-cpp/lib/LookupService.h
index 5a899a4..2fdb51a 100644
--- a/pulsar-client-cpp/lib/LookupService.h
+++ b/pulsar-client-cpp/lib/LookupService.h
@@ -25,6 +25,8 @@
#include <lib/LogUtils.h>
#include <lib/TopicName.h>
+#include <vector>
+
namespace pulsar {
typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
typedef Promise<Result, NamespaceTopicsPtr> NamespaceTopicsPromise;
diff --git a/pulsar-client-cpp/lib/MessageCrypto.h b/pulsar-client-cpp/lib/MessageCrypto.h
index 566313f..f44f37f 100644
--- a/pulsar-client-cpp/lib/MessageCrypto.h
+++ b/pulsar-client-cpp/lib/MessageCrypto.h
@@ -22,7 +22,7 @@
#include <iostream>
#include <map>
#include <set>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include <boost/scoped_array.hpp>
#include <openssl/ssl.h>
@@ -96,8 +96,8 @@ class MessageCrypto {
const CryptoKeyReaderPtr keyReader, SharedBuffer& decryptedPayload);
private:
- typedef boost::unique_lock<boost::mutex> Lock;
- boost::mutex mutex_;
+ typedef std::unique_lock<std::mutex> Lock;
+ std::mutex mutex_;
int dataKeyLen_;
boost::scoped_array<unsigned char> dataKey_;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 2b16f62..90aa9bf 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -498,7 +498,7 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
return ResultInvalidConfiguration;
}
- if (messages_.pop(msg, milliseconds(timeout))) {
+ if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
lock.unlock();
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
@@ -519,7 +519,7 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
stateLock.unlock();
Lock lock(pendingReceiveMutex_);
- if (messages_.pop(msg, milliseconds(0))) {
+ if (messages_.pop(msg, std::chrono::milliseconds(0))) {
lock.unlock();
unAckedMessageTrackerPtr_->add(msg.getMessageId());
callback(ResultOk, msg);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 24ef7dd..74350b9 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -23,7 +23,7 @@
#include "BlockingQueue.h"
#include <vector>
#include <queue>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include "ConsumerImplBase.h"
#include "lib/UnAckedMessageTrackerDisabled.h"
@@ -90,8 +90,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
typedef std::map<std::string, ConsumerImplPtr> ConsumerMap;
ConsumerMap consumers_;
std::map<std::string, int> topicsPartitions_;
- boost::mutex mutex_;
- boost::mutex pendingReceiveMutex_;
+ std::mutex mutex_;
+ std::mutex pendingReceiveMutex_;
MultiTopicsConsumerState state_;
std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
LookupServicePtr lookupServicePtr_;
diff --git a/pulsar-client-cpp/lib/ObjectPool.h b/pulsar-client-cpp/lib/ObjectPool.h
index 9fa6ca1..3cf39f2 100644
--- a/pulsar-client-cpp/lib/ObjectPool.h
+++ b/pulsar-client-cpp/lib/ObjectPool.h
@@ -20,10 +20,8 @@
#define LIB_OBJECTPOOL_H_
#include <algorithm>
-#include <boost/thread/locks.hpp>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include <memory>
-#include <boost/thread/tss.hpp>
namespace pulsar {
@@ -34,7 +32,7 @@ class Allocator {
class Impl {
public:
// cheap lock to acquire
- static boost::mutex mutex_;
+ static std::mutex mutex_;
// note: use std::forward_list<> when switching to C++11 mode
struct Node {
@@ -59,7 +57,7 @@ class Allocator {
void* pop() {
if (!head_) {
// size = 0
- boost::lock_guard<boost::mutex> lock(mutex_);
+ std::lock_guard<std::mutex> lock(mutex_);
if (!globalPool_) {
return NULL;
@@ -86,7 +84,7 @@ class Allocator {
bool deleteList = true;
{
// Move the entries to global pool
- boost::lock_guard<boost::mutex> lock(mutex_);
+ std::lock_guard<std::mutex> lock(mutex_);
// If total node count reached max allowed cache limit,
// skip adding to global pool.
@@ -148,7 +146,7 @@ class Allocator {
}
};
- static boost::thread_specific_ptr<Impl> implPtr_;
+ static thread_local std::unique_ptr<Impl> implPtr_;
typedef T value_type;
typedef size_t size_type;
typedef T* pointer;
@@ -192,10 +190,10 @@ class Allocator {
// typename Allocator<Type,MaxSize>::Impl is important else the compiler
// doesn't understand that it is a type
template <typename Type, int MaxSize>
-boost::thread_specific_ptr<typename Allocator<Type, MaxSize>::Impl> Allocator<Type, MaxSize>::implPtr_;
+thread_local std::unique_ptr<typename Allocator<Type, MaxSize>::Impl> Allocator<Type, MaxSize>::implPtr_;
template <typename Type, int MaxSize>
-boost::mutex Allocator<Type, MaxSize>::Impl::mutex_;
+std::mutex Allocator<Type, MaxSize>::Impl::mutex_;
template <typename Type, int MaxSize>
struct Allocator<Type, MaxSize>::Impl::GlobalPool* Allocator<Type, MaxSize>::Impl::globalPool_;
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 6ad847e..6edf06f 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -88,7 +88,7 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
return ResultInvalidConfiguration;
}
- if (messages_.pop(msg, milliseconds(timeout))) {
+ if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
@@ -108,7 +108,7 @@ void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
stateLock.unlock();
Lock lock(pendingReceiveMutex_);
- if (messages_.pop(msg, milliseconds(0))) {
+ if (messages_.pop(msg, std::chrono::milliseconds(0))) {
lock.unlock();
unAckedMessageTrackerPtr_->add(msg.getMessageId());
callback(ResultOk, msg);
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index f4274b1..ec79529 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -23,7 +23,7 @@
#include <vector>
#include <queue>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include "ConsumerImplBase.h"
#include "lib/UnAckedMessageTrackerDisabled.h"
#include <lib/Latch.h>
@@ -80,8 +80,8 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
const ConsumerConfiguration conf_;
typedef std::vector<ConsumerImplPtr> ConsumerList;
ConsumerList consumers_;
- boost::mutex mutex_;
- boost::mutex pendingReceiveMutex_;
+ std::mutex mutex_;
+ std::mutex pendingReceiveMutex_;
PartitionedConsumerState state_;
unsigned int unsubscribedSoFar_;
BlockingQueue<Message> messages_;
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index 11ca42e..72be5fe 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -20,7 +20,7 @@
#include "ClientImpl.h"
#include <vector>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include <pulsar/MessageRoutingPolicy.h>
#include <pulsar/TopicMetadata.h>
#include <lib/TopicName.h>
@@ -40,7 +40,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
};
const static std::string PARTITION_NAME_SUFFIX;
- typedef boost::unique_lock<boost::mutex> Lock;
+ typedef std::unique_lock<std::mutex> Lock;
PartitionedProducerImpl(ClientImplPtr ptr, const TopicNamePtr topicName, const unsigned int numPartitions,
const ProducerConfiguration& config);
@@ -110,7 +110,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
MessageRoutingPolicyPtr routerPolicy_;
// mutex_ is used to share state_, and numProducersCreated_
- boost::mutex mutex_;
+ std::mutex mutex_;
PartitionedProducerState state_;
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 0b95811..5e84007 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -19,7 +19,7 @@
#ifndef LIB_PRODUCERIMPL_H_
#define LIB_PRODUCERIMPL_H_
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include <boost/date_time/posix_time/ptime.hpp>
#include "ClientImpl.h"
@@ -133,7 +133,7 @@ class ProducerImpl : public HandlerBase,
bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
SharedBuffer& encryptedPayload);
- typedef boost::unique_lock<boost::mutex> Lock;
+ typedef std::unique_lock<std::mutex> Lock;
ProducerConfiguration conf_;
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
index 7c6f23c..8965349 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
@@ -22,7 +22,7 @@
#include <pulsar/MessageRoutingPolicy.h>
#include <pulsar/ProducerConfiguration.h>
#include <pulsar/TopicMetadata.h>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include "Hash.h"
#include "MessageRouterBase.h"
@@ -35,10 +35,10 @@ class RoundRobinMessageRouter : public MessageRouterBase {
virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata);
private:
- boost::mutex mutex_;
+ std::mutex mutex_;
unsigned int prevPartition_;
};
-typedef boost::unique_lock<boost::mutex> Lock;
+typedef std::unique_lock<std::mutex> Lock;
} // namespace pulsar
#pragma GCC visibility pop
#endif // PULSAR_RR_MESSAGE_ROUTER_HEADER_
diff --git a/pulsar-client-cpp/lib/TopicName.cc b/pulsar-client-cpp/lib/TopicName.cc
index 828c971..84bff73 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -34,10 +34,10 @@
DECLARE_LOG_OBJECT()
namespace pulsar {
-typedef boost::unique_lock<boost::mutex> Lock;
+typedef std::unique_lock<std::mutex> Lock;
// static members
CURL* TopicName::curl = NULL;
-boost::mutex TopicName::curlHandleMutex;
+std::mutex TopicName::curlHandleMutex;
CURL* TopicName::getCurlHandle() {
if (curl == NULL) {
diff --git a/pulsar-client-cpp/lib/TopicName.h b/pulsar-client-cpp/lib/TopicName.h
index ad58116..fc8dccb 100644
--- a/pulsar-client-cpp/lib/TopicName.h
+++ b/pulsar-client-cpp/lib/TopicName.h
@@ -24,7 +24,7 @@
#include <string>
#include <curl/curl.h>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#pragma GCC visibility push(default)
@@ -59,7 +59,7 @@ class TopicName : public ServiceUnitId {
private:
static CURL* getCurlHandle();
static CURL* curl;
- static boost::mutex curlHandleMutex;
+ static std::mutex curlHandleMutex;
static bool parse(const std::string& topicName, std::string& domain, std::string& property,
std::string& cluster, std::string& namespacePortion, std::string& localName);
TopicName();
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index ba9fc97..248a60d 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -40,7 +40,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
}
void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
- boost::unique_lock<boost::mutex> acquire(lock_);
+ std::lock_guard<std::mutex> acquire(lock_);
LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
<< consumerReference_.getName().c_str());
if (!oldSet_.empty()) {
@@ -62,28 +62,28 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, const
}
bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
- boost::unique_lock<boost::mutex> acquire(lock_);
+ std::lock_guard<std::mutex> acquire(lock_);
oldSet_.erase(m);
return currentSet_.insert(m).second;
}
bool UnAckedMessageTrackerEnabled::isEmpty() {
- boost::unique_lock<boost::mutex> acquire(lock_);
+ std::lock_guard<std::mutex> acquire(lock_);
return oldSet_.empty() && currentSet_.empty();
}
bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
- boost::unique_lock<boost::mutex> acquire(lock_);
+ std::lock_guard<std::mutex> acquire(lock_);
return oldSet_.erase(m) || currentSet_.erase(m);
}
long UnAckedMessageTrackerEnabled::size() {
- boost::unique_lock<boost::mutex> acquire(lock_);
+ std::lock_guard<std::mutex> acquire(lock_);
return oldSet_.size() + currentSet_.size();
}
void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
- boost::unique_lock<boost::mutex> acquire(lock_);
+ std::lock_guard<std::mutex> acquire(lock_);
for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
if (*it < msgId && it->partition() == msgId.partition()) {
oldSet_.erase(it++);
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 7bea00d..ea92305 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -19,6 +19,9 @@
#ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_
#define LIB_UNACKEDMESSAGETRACKERENABLED_H_
#include "lib/UnAckedMessageTrackerInterface.h"
+
+#include <mutex>
+
namespace pulsar {
class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
@@ -40,7 +43,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
long size();
std::set<MessageId> currentSet_;
std::set<MessageId> oldSet_;
- boost::mutex lock_;
+ std::mutex lock_;
DeadlineTimerPtr timer_;
ConsumerImplBase& consumerReference_;
ClientImplPtr client_;
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
index e03b0a2..ea49912 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
@@ -24,15 +24,12 @@
#include <algorithm>
#include <utility>
#include "pulsar/MessageId.h"
-#include <boost/thread/locks.hpp>
-#include <boost/thread/shared_mutex.hpp>
#include "lib/ClientImpl.h"
#include "lib/ConsumerImplBase.h"
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <lib/LogUtils.h>
#include "lib/PulsarApi.pb.h"
-#include <boost/thread/recursive_mutex.hpp>
#include <boost/asio/error.hpp>
namespace pulsar {
diff --git a/pulsar-client-cpp/lib/UnboundedBlockingQueue.h b/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
index 69325f0..841be30 100644
--- a/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
+++ b/pulsar-client-cpp/lib/UnboundedBlockingQueue.h
@@ -19,8 +19,8 @@
#ifndef LIB_UNBOUNDEDBLOCKINGQUEUE_H_
#define LIB_UNBOUNDEDBLOCKINGQUEUE_H_
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition.hpp>
+#include <mutex>
+#include <condition_variable>
#include <boost/circular_buffer.hpp>
// For struct QueueNotEmpty
#include "BlockingQueue.h"
@@ -72,10 +72,10 @@ class UnboundedBlockingQueue {
lock.unlock();
}
- bool pop(T& value, const boost::posix_time::time_duration& timeout) {
+ template <typename Duration>
+ bool pop(T& value, const Duration& timeout) {
Lock lock(mutex_);
- if (!queueEmptyCondition_.timed_wait(lock, timeout,
- QueueNotEmpty<UnboundedBlockingQueue<T> >(*this))) {
+ if (!queueEmptyCondition_.wait_for(lock, timeout, QueueNotEmpty<UnboundedBlockingQueue<T> >(*this))) {
return false;
}
@@ -136,11 +136,11 @@ class UnboundedBlockingQueue {
private:
bool isEmptyNoMutex() const { return queue_.empty(); }
- mutable boost::mutex mutex_;
- boost::condition_variable queueEmptyCondition_;
+ mutable std::mutex mutex_;
+ std::condition_variable queueEmptyCondition_;
Container queue_;
- typedef boost::unique_lock<boost::mutex> Lock;
+ typedef std::unique_lock<std::mutex> Lock;
friend struct QueueNotEmpty<UnboundedBlockingQueue<T> >;
};
diff --git a/pulsar-client-cpp/lib/Utils.h b/pulsar-client-cpp/lib/Utils.h
index d119951..620cf60 100644
--- a/pulsar-client-cpp/lib/Utils.h
+++ b/pulsar-client-cpp/lib/Utils.h
@@ -23,6 +23,9 @@
#include "Future.h"
+#include <map>
+#include <iostream>
+
namespace pulsar {
struct WaitForCallback {
diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
index 2212b5b..a4aa7aa 100644
--- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
+++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
@@ -38,6 +38,8 @@
#include <boost/archive/iterators/base64_from_binary.hpp>
#include <boost/archive/iterators/transform_width.hpp>
+#include <mutex>
+
DECLARE_LOG_OBJECT()
namespace pulsar {
@@ -254,14 +256,14 @@ static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void
return size * nmemb;
}
-static boost::mutex cacheMtx_;
+static std::mutex cacheMtx_;
const std::string ZTSClient::getRoleToken() const {
RoleToken roleToken;
std::string cacheKey = "p=" + tenantDomain_ + "." + tenantService_ + ";d=" + providerDomain_;
// locked block
{
- boost::lock_guard<boost::mutex> lock(cacheMtx_);
+ std::lock_guard<std::mutex> lock(cacheMtx_);
roleToken = roleTokenCache_[cacheKey];
}
@@ -328,7 +330,7 @@ const std::string ZTSClient::getRoleToken() const {
}
roleToken.token = root["token"].asString();
roleToken.expiryTime = root["expiryTime"].asUInt();
- boost::lock_guard<boost::mutex> lock(cacheMtx_);
+ std::lock_guard<std::mutex> lock(cacheMtx_);
roleTokenCache_[cacheKey] = roleToken;
LOG_DEBUG("Got role token " << roleToken.token)
} else {
diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.h b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.h
index 468ed02..d9b9abe 100644
--- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.h
+++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.h
@@ -19,7 +19,7 @@
#include <string>
#include <map>
#include <lib/LogUtils.h>
-#include <boost/thread.hpp>
+
#pragma GCC visibility push(default)
namespace pulsar {
diff --git a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
index 646005d..4c8e8f6 100644
--- a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
+++ b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
@@ -39,7 +39,7 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
std::string consumerStr_;
DeadlineTimerPtr timer_;
- boost::mutex mutex_;
+ std::mutex mutex_;
unsigned int statsIntervalInSeconds_;
friend std::ostream& operator<<(std::ostream&, const ConsumerStatsImpl&);
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
index 6929efd..2212809 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
@@ -36,7 +36,8 @@
#include <boost/array.hpp>
#include <boost/date_time/local_time/local_time.hpp>
-#include <boost/array.hpp>
+#include <memory>
+#include <mutex>
#include <iostream>
#include <vector>
#include <lib/Utils.h>
@@ -49,7 +50,7 @@ typedef boost::accumulators::accumulator_set<
boost::accumulators::stats<boost::accumulators::tag::mean, boost::accumulators::tag::extended_p_square> >
LatencyAccumulator;
-class ProducerStatsImpl : public boost::enable_shared_from_this<ProducerStatsImpl>, public ProducerStatsBase {
+class ProducerStatsImpl : public std::enable_shared_from_this<ProducerStatsImpl>, public ProducerStatsBase {
private:
unsigned long numMsgsSent_;
unsigned long numBytesSent_;
@@ -63,7 +64,7 @@ class ProducerStatsImpl : public boost::enable_shared_from_this<ProducerStatsImp
std::string producerStr_;
DeadlineTimerPtr timer_;
- boost::mutex mutex_;
+ std::mutex mutex_;
unsigned int statsIntervalInSeconds_;
friend std::ostream& operator<<(std::ostream&, const ProducerStatsImpl&);
diff --git a/pulsar-client-cpp/perf/PerfConsumer.cc b/pulsar-client-cpp/perf/PerfConsumer.cc
index 06b168c..db316ad 100644
--- a/pulsar-client-cpp/perf/PerfConsumer.cc
+++ b/pulsar-client-cpp/perf/PerfConsumer.cc
@@ -23,10 +23,11 @@ DECLARE_LOG_OBJECT()
#include <thread>
#include <iostream>
#include <fstream>
+#include <mutex>
using namespace std::chrono;
-#include <boost/thread.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/bind.hpp>
#include <boost/filesystem.hpp>
#include <boost/program_options.hpp>
@@ -130,8 +131,8 @@ void handleAckComplete(Result) {
}
-boost::mutex mutex;
-typedef boost::unique_lock<boost::mutex> Lock;
+std::mutex mutex;
+typedef std::unique_lock<std::mutex> Lock;
typedef accumulator_set<uint64_t, stats<tag::mean, tag::p_square_quantile> > LatencyAccumulator;
LatencyAccumulator e2eLatencyAccumulator(quantile_probability = 0.99);
diff --git a/pulsar-client-cpp/perf/PerfProducer.cc b/pulsar-client-cpp/perf/PerfProducer.cc
index 8f9704b..e36728b 100644
--- a/pulsar-client-cpp/perf/PerfProducer.cc
+++ b/pulsar-client-cpp/perf/PerfProducer.cc
@@ -19,7 +19,8 @@
#include <lib/LogUtils.h>
DECLARE_LOG_OBJECT()
-#include <boost/thread.hpp>
+#include <mutex>
+
#include <boost/bind.hpp>
#include <boost/filesystem.hpp>
@@ -29,9 +30,10 @@ DECLARE_LOG_OBJECT()
#include <boost/accumulators/statistics/p_square_quantile.hpp>
#include <boost/program_options/variables_map.hpp>
#include <boost/program_options.hpp>
-#include <boost/thread.hpp>
+#include <thread>
namespace po = boost::program_options;
+#include <atomic>
#include <iostream>
#include <fstream>
#include <vector>
@@ -117,10 +119,10 @@ class EncKeyReader: public CryptoKeyReader {
typedef accumulator_set<uint64_t, stats<tag::mean, tag::p_square_quantile> > LatencyAccumulator;
LatencyAccumulator e2eLatencyAccumulator(quantile_probability = 0.99);
std::vector<pulsar::Producer> producerList;
-std::vector<boost::thread> threadList;
+std::vector<std::thread> threadList;
-boost::mutex mutex;
-typedef boost::unique_lock<boost::mutex> Lock;
+std::mutex mutex;
+typedef std::unique_lock<std::mutex> Lock;
typedef std::chrono::high_resolution_clock Clock;
@@ -136,29 +138,30 @@ void sendCallback(pulsar::Result result, const pulsar::Message& msg, Clock::time
// Start a pulsar producer on a topic and keep producing messages
void runProducer(const Arguments& args, std::string topicName, int threadIndex,
- RateLimiterPtr limiter, pulsar::Producer& producer) {
+ RateLimiterPtr limiter, pulsar::Producer& producer,
+ const std::atomic<bool>& exitCondition) {
LOG_INFO("Producing messages for topic = " << topicName << ", threadIndex = " << threadIndex);
- boost::scoped_array<char> payload(new char[args.msgSize]);
+ std::unique_ptr<char[]> payload(new char[args.msgSize]);
memset(payload.get(), 0, args.msgSize);
pulsar::MessageBuilder builder;
- try {
- while (true) {
- if (args.rate != -1) {
- limiter->acquire();
- }
- pulsar::Message msg = builder.create().setAllocatedContent(payload.get(), args.msgSize).build();
- producer.sendAsync(msg, boost::bind(sendCallback, _1, _2, Clock::now()));
- boost::this_thread::interruption_point();
+ while (true) {
+ if (args.rate != -1) {
+ limiter->acquire();
+ }
+ pulsar::Message msg = builder.create().setAllocatedContent(payload.get(), args.msgSize).build();
+
+ producer.sendAsync(msg, boost::bind(sendCallback, _1, _2, Clock::now()));
+ if (exitCondition) {
+ LOG_INFO("Thread interrupted. Exiting producer thread.");
+ break;
}
- } catch(const boost::thread_interrupted& e) {
- // Thread interruption request received, break the loop
- LOG_INFO("Thread interrupted. Exiting thread.");
}
}
-void startPerfProducer(const Arguments& args, pulsar::ProducerConfiguration &producerConf, pulsar::Client &client) {
+void startPerfProducer(const Arguments& args, pulsar::ProducerConfiguration &producerConf, pulsar::Client &client,
+ const std::atomic<bool>& exitCondition) {
RateLimiterPtr limiter;
if (args.rate != -1) {
limiter = std::make_shared<pulsar::RateLimiter>(args.rate);
@@ -181,7 +184,9 @@ void startPerfProducer(const Arguments& args, pulsar::ProducerConfiguration &pro
}
for (int k = 0; k < args.numOfThreadsPerProducer; k++) {
- threadList.push_back(boost::thread(boost::bind(runProducer, args, topic, k, limiter, producerList[i*args.numProducers + j])));
+ threadList.push_back(std::thread(
+ boost::bind(runProducer, args, topic, k, limiter, producerList[i * args.numProducers + j],
+ std::cref(exitCondition))));
}
}
}
@@ -364,7 +369,9 @@ int main(int argc, char** argv) {
}
pulsar::Client client(pulsar::PulsarFriend::getClient(args.serviceURL, conf, false));
- startPerfProducer(args, producerConf, client);
+
+ std::atomic<bool> exitCondition;
+ startPerfProducer(args, producerConf, client, exitCondition);
Clock::time_point oldTime = Clock::now();
unsigned long totalMessagesProduced = 0;
@@ -392,9 +399,9 @@ int main(int argc, char** argv) {
oldTime = now;
}
LOG_INFO("Total messagesProduced = " << totalMessagesProduced + messagesProduced);
- for (int i = 0; i < threadList.size(); i++) {
- threadList[i].interrupt();
- threadList[i].join();
+ exitCondition = true;
+ for (auto& thread : threadList) {
+ thread.join();
}
// Waiting for the sendCallbacks To Complete
usleep(2 * 1000 * 1000);
diff --git a/pulsar-client-cpp/perf/RateLimiter.h b/pulsar-client-cpp/perf/RateLimiter.h
index 5fa8f81..6812a4d 100644
--- a/pulsar-client-cpp/perf/RateLimiter.h
+++ b/pulsar-client-cpp/perf/RateLimiter.h
@@ -21,7 +21,7 @@
#include <chrono>
#include <thread>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
namespace pulsar {
@@ -42,8 +42,8 @@ class RateLimiter {
long storedPermits_;
double maxPermits_;
Clock::time_point nextFree_;
- boost::mutex mutex_;
- typedef boost::unique_lock<boost::mutex> Lock;
+ std::mutex mutex_;
+ typedef std::unique_lock<std::mutex> Lock;
};
RateLimiter::RateLimiter(double rate)
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc b/pulsar-client-cpp/tests/AuthPluginTest.cc
index 75683a8..b2169e0 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -21,7 +21,7 @@
#include <pulsar/Client.h>
#include <boost/asio.hpp>
#include <boost/algorithm/string.hpp>
-#include <boost/thread.hpp>
+#include <thread>
#include <lib/LogUtils.h>
#include "lib/Future.h"
@@ -196,7 +196,7 @@ void mockZTS(int port) {
} // namespace testAthenz
TEST(AuthPluginTest, testAthenz) {
- boost::thread zts(boost::bind(&testAthenz::mockZTS, 9999));
+ std::thread zts(std::bind(&testAthenz::mockZTS, 9999));
pulsar::AuthenticationDataPtr data;
std::string params = R"({
"tenantDomain": "pulsar.test.tenant",
@@ -266,7 +266,7 @@ TEST(AuthPluginTest, testAuthFactoryTls) {
}
TEST(AuthPluginTest, testAuthFactoryAthenz) {
- boost::thread zts(boost::bind(&testAthenz::mockZTS, 9998));
+ std::thread zts(std::bind(&testAthenz::mockZTS, 9998));
pulsar::AuthenticationDataPtr data;
std::string params = R"({
"tenantDomain": "pulsar.test2.tenant",
diff --git a/pulsar-client-cpp/tests/AuthTokenTest.cc b/pulsar-client-cpp/tests/AuthTokenTest.cc
index dad9a0e..1b136b0 100644
--- a/pulsar-client-cpp/tests/AuthTokenTest.cc
+++ b/pulsar-client-cpp/tests/AuthTokenTest.cc
@@ -23,7 +23,6 @@
#include <pulsar/Client.h>
#include <boost/asio.hpp>
#include <boost/algorithm/string.hpp>
-#include <boost/thread.hpp>
#include <lib/LogUtils.h>
#include <string>
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 17f01b4..4f04cb7 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -24,10 +24,8 @@
#include <lib/Latch.h>
#include <sstream>
#include "boost/date_time/posix_time/posix_time.hpp"
-#include "boost/enable_shared_from_this.hpp"
#include "CustomRoutingPolicy.h"
-#include <boost/thread.hpp>
-#include <boost/thread/mutex.hpp>
+#include <mutex>
#include <lib/TopicName.h>
#include "PulsarFriend.h"
#include "HttpHelper.h"
@@ -41,7 +39,7 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;
-boost::mutex mutex_;
+std::mutex mutex_;
static int globalTestBatchMessagesCounter = 0;
static int globalCount = 0;
static long globalResendMessageCount = 0;
@@ -60,7 +58,7 @@ static void messageListenerFunctionWithoutAck(Consumer consumer, const Message&
}
static void sendCallBack(Result r, const Message& msg, std::string prefix, int* count) {
- static boost::mutex sendMutex_;
+ static std::mutex sendMutex_;
sendMutex_.lock();
ASSERT_EQ(r, ResultOk);
std::string messageContent = prefix + std::to_string(*count);
@@ -72,7 +70,7 @@ static void sendCallBack(Result r, const Message& msg, std::string prefix, int*
static void receiveCallBack(Result r, const Message& msg, std::string& messageContent, bool checkContent,
bool* isFailed, int* count) {
- static boost::mutex receiveMutex_;
+ static std::mutex receiveMutex_;
receiveMutex_.lock();
if (r == ResultOk) {
@@ -1525,7 +1523,7 @@ TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
- ASSERT_TRUE(latch.wait(milliseconds(30 * 1000)));
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(30)));
ASSERT_GE(globalCount, 2);
consumer.unsubscribe();
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc
index de19ed9..3ebcabb 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -25,7 +25,6 @@
#include <sstream>
#include "boost/date_time/posix_time/posix_time.hpp"
#include "CustomRoutingPolicy.h"
-#include <boost/thread.hpp>
#include "lib/Future.h"
#include "lib/Utils.h"
#include <ctime>
diff --git a/pulsar-client-cpp/tests/BlockingQueueTest.cc b/pulsar-client-cpp/tests/BlockingQueueTest.cc
index 7073b40..4047e5e 100644
--- a/pulsar-client-cpp/tests/BlockingQueueTest.cc
+++ b/pulsar-client-cpp/tests/BlockingQueueTest.cc
@@ -19,20 +19,17 @@
#include <gtest/gtest.h>
#include <lib/BlockingQueue.h>
-#include <boost/thread.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
-
-using namespace boost::posix_time;
+#include <thread>
class ProducerWorker {
private:
- boost::thread producerThread_;
+ std::thread producerThread_;
BlockingQueue<int>& queue_;
public:
ProducerWorker(BlockingQueue<int>& queue) : queue_(queue) {}
- void produce(int number) { producerThread_ = boost::thread(&ProducerWorker::pushNumbers, this, number); }
+ void produce(int number) { producerThread_ = std::thread(&ProducerWorker::pushNumbers, this, number); }
void pushNumbers(int number) {
for (int i = 1; i <= number; i++) {
@@ -45,13 +42,13 @@ class ProducerWorker {
class ConsumerWorker {
private:
- boost::thread consumerThread_;
+ std::thread consumerThread_;
BlockingQueue<int>& queue_;
public:
ConsumerWorker(BlockingQueue<int>& queue) : queue_(queue) {}
- void consume(int number) { consumerThread_ = boost::thread(&ConsumerWorker::popNumbers, this, number); }
+ void consume(int number) { consumerThread_ = std::thread(&ConsumerWorker::popNumbers, this, number); }
void popNumbers(int number) {
for (int i = 1; i <= number; i++) {
@@ -148,8 +145,8 @@ TEST(BlockingQueueTest, testTimeout) {
size_t size = 5;
BlockingQueue<int> queue(size);
int value;
- bool popReturn = queue.pop(value, seconds(1));
- boost::this_thread::sleep(seconds(2));
+ bool popReturn = queue.pop(value, std::chrono::seconds(1));
+ std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_FALSE(popReturn);
}
diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
index be24863..133f94c 100644
--- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
@@ -20,9 +20,7 @@
#include <pulsar/Client.h>
#include <lib/LogUtils.h>
#include <lib/Commands.h>
-#include "boost/date_time/posix_time/posix_time.hpp"
#include "CustomRoutingPolicy.h"
-#include <boost/thread.hpp>
#include "lib/Future.h"
#include "lib/Utils.h"
#include "PulsarFriend.h"
@@ -306,5 +304,5 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {
consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 10, latch, 3));
// Wait for ten seconds only
- ASSERT_TRUE(latch.wait(milliseconds(10 * 1000)));
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
}
diff --git a/pulsar-client-cpp/tests/LatchTest.cc b/pulsar-client-cpp/tests/LatchTest.cc
index 38e19ee..9f332fb 100644
--- a/pulsar-client-cpp/tests/LatchTest.cc
+++ b/pulsar-client-cpp/tests/LatchTest.cc
@@ -18,30 +18,28 @@
*/
#include <gtest/gtest.h>
#include <lib/Latch.h>
-#include <boost/thread.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <thread>
#include "LogUtils.h"
DECLARE_LOG_OBJECT()
using namespace pulsar;
-using namespace boost::posix_time;
class Service {
private:
std::string serviceName_;
- time_duration sleepDuration_;
+ std::chrono::milliseconds sleepDuration_;
Latch latch_;
- boost::thread thread_;
+ std::thread thread_;
public:
- Service(const std::string& serviceName, time_duration sleepDuration, const Latch& latch)
+ Service(const std::string& serviceName, std::chrono::milliseconds sleepDuration, const Latch& latch)
: serviceName_(serviceName), sleepDuration_(sleepDuration), latch_(latch) {
- thread_ = boost::thread(&Service::run, this);
+ thread_ = std::thread(&Service::run, this);
}
void run() {
- boost::this_thread::sleep(sleepDuration_);
+ std::this_thread::sleep_for(sleepDuration_);
LOG_INFO("Service " << serviceName_ << " is up");
latch_.countdown();
}
@@ -51,17 +49,17 @@ class Service {
TEST(LatchTest, testCountDown) {
Latch latch(3);
- Service service1("service1", millisec(50), latch);
- Service service2("service2", millisec(30), latch);
- Service service3("service3", millisec(20), latch);
+ Service service1("service1", std::chrono::milliseconds(50), latch);
+ Service service2("service2", std::chrono::milliseconds(30), latch);
+ Service service3("service3", std::chrono::milliseconds(20), latch);
latch.wait();
}
TEST(LatchTest, testLatchCount) {
Latch latch(3);
- Service service1("service1", millisec(50), latch);
- Service service2("service2", millisec(30), latch);
- Service service3("service3", millisec(20), latch);
+ Service service1("service1", std::chrono::milliseconds(50), latch);
+ Service service2("service2", std::chrono::milliseconds(30), latch);
+ Service service3("service3", std::chrono::milliseconds(20), latch);
ASSERT_EQ(3, latch.getCount());
latch.wait();
ASSERT_EQ(0, latch.getCount());
@@ -70,17 +68,17 @@ TEST(LatchTest, testLatchCount) {
TEST(LatchTest, testTimedWait) {
// Wait for 7 seconds which is more than the maximum sleep time (5 seconds)
Latch latch1(3);
- Service service1("service1", millisec(50), latch1);
- Service service2("service2", millisec(30), latch1);
- Service service3("service3", millisec(50), latch1);
- ASSERT_TRUE(latch1.wait(millisec(70)));
+ Service service1("service1", std::chrono::milliseconds(50), latch1);
+ Service service2("service2", std::chrono::milliseconds(30), latch1);
+ Service service3("service3", std::chrono::milliseconds(50), latch1);
+ ASSERT_TRUE(latch1.wait(std::chrono::milliseconds(70)));
// Wait for 3 seconds which is less than the maximum sleep time (5 seconds)
Latch latch2(3);
- Service service4("service4", millisec(50), latch2);
- Service service5("service5", millisec(30), latch2);
- Service service6("service6", millisec(50), latch2);
- ASSERT_FALSE(latch2.wait(millisec(30)));
+ Service service4("service4", std::chrono::milliseconds(50), latch2);
+ Service service5("service5", std::chrono::milliseconds(30), latch2);
+ Service service6("service6", std::chrono::milliseconds(50), latch2);
+ ASSERT_FALSE(latch2.wait(std::chrono::milliseconds(30)));
// After the assert is passed and Service is destroyed because of join, the
// main thread would not exit until service4 thread is returned.
diff --git a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
index 890cee6..3b22a30 100644
--- a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
+++ b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
@@ -105,7 +105,7 @@ TEST(ZeroQueueSizeTest, testMessageListener) {
ASSERT_EQ(ResultOk, result);
}
- ASSERT_TRUE(latch.wait(milliseconds(30 * 1000)));
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(30)));
ASSERT_EQ(globalCount, totalMessages);
consumer.unsubscribe();