You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by hc...@apache.org on 2014/11/18 10:02:40 UTC
[12/37] thrift git commit: THRIFT-2729: C++ - .clang-format created
and applied
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
index c45a964..5f6dade 100644
--- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
@@ -29,7 +29,9 @@
#include <boost/weak_ptr.hpp>
#include <boost/thread.hpp>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
using boost::shared_ptr;
using boost::weak_ptr;
@@ -39,38 +41,29 @@ using boost::weak_ptr;
*
* @version $Id:$
*/
-class BoostThread: public Thread {
- public:
-
- enum STATE {
- uninitialized,
- starting,
- started,
- stopping,
- stopped
- };
+class BoostThread : public Thread {
+public:
+ enum STATE { uninitialized, starting, started, stopping, stopped };
static void* threadMain(void* arg);
- private:
+private:
std::auto_ptr<boost::thread> thread_;
STATE state_;
weak_ptr<BoostThread> self_;
bool detached_;
- public:
-
- BoostThread(bool detached, shared_ptr<Runnable> runnable) :
- state_(uninitialized),
- detached_(detached) {
- this->Thread::runnable(runnable);
- }
+public:
+ BoostThread(bool detached, shared_ptr<Runnable> runnable)
+ : state_(uninitialized), detached_(detached) {
+ this->Thread::runnable(runnable);
+ }
~BoostThread() {
- if(!detached_) {
+ if (!detached_) {
try {
join();
- } catch(...) {
+ } catch (...) {
// We're really hosed.
}
}
@@ -81,15 +74,16 @@ class BoostThread: public Thread {
return;
}
- // Create reference
+ // Create reference
shared_ptr<BoostThread>* selfRef = new shared_ptr<BoostThread>();
*selfRef = self_.lock();
state_ = starting;
- thread_ = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(threadMain, (void*)selfRef)));
+ thread_
+ = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(threadMain, (void*)selfRef)));
- if(detached_)
+ if (detached_)
thread_->detach();
}
@@ -99,9 +93,7 @@ class BoostThread: public Thread {
}
}
- Thread::id_t getId() {
- return thread_.get() ? thread_->get_id() : boost::thread::id();
- }
+ Thread::id_t getId() { return thread_.get() ? thread_->get_id() : boost::thread::id(); }
shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
@@ -139,13 +131,11 @@ void* BoostThread::threadMain(void* arg) {
*/
class BoostThreadFactory::Impl {
- private:
+private:
bool detached_;
- public:
-
- Impl(bool detached) :
- detached_(detached) {}
+public:
+ Impl(bool detached) : detached_(detached) {}
/**
* Creates a new POSIX thread to run the runnable object
@@ -163,22 +153,30 @@ class BoostThreadFactory::Impl {
void setDetached(bool value) { detached_ = value; }
- Thread::id_t getCurrentThreadId() const {
- return boost::this_thread::get_id();
- }
+ Thread::id_t getCurrentThreadId() const { return boost::this_thread::get_id(); }
};
-BoostThreadFactory::BoostThreadFactory(bool detached) :
- impl_(new BoostThreadFactory::Impl(detached)) {}
-
-shared_ptr<Thread> BoostThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
+BoostThreadFactory::BoostThreadFactory(bool detached)
+ : impl_(new BoostThreadFactory::Impl(detached)) {
+}
-bool BoostThreadFactory::isDetached() const { return impl_->isDetached(); }
+shared_ptr<Thread> BoostThreadFactory::newThread(shared_ptr<Runnable> runnable) const {
+ return impl_->newThread(runnable);
+}
-void BoostThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+bool BoostThreadFactory::isDetached() const {
+ return impl_->isDetached();
+}
-Thread::id_t BoostThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
+void BoostThreadFactory::setDetached(bool value) {
+ impl_->setDetached(value);
+}
-}}} // apache::thrift::concurrency
+Thread::id_t BoostThreadFactory::getCurrentThreadId() const {
+ return impl_->getCurrentThreadId();
+}
+}
+}
+} // apache::thrift::concurrency
#endif // USE_BOOST_THREAD
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
index 6a236d3..fc06e56 100644
--- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
@@ -24,7 +24,9 @@
#include <boost/shared_ptr.hpp>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* A thread factory to create posix threads
@@ -33,21 +35,21 @@ namespace apache { namespace thrift { namespace concurrency {
*/
class BoostThreadFactory : public ThreadFactory {
- public:
-
+public:
/**
* Boost thread factory. All threads created by a factory are reference-counted
* via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
* the Runnable tasks they host will be properly cleaned up once the last strong reference
* to both is given up.
*
- * Threads are created with the specified boost policy, priority, stack-size. A detachable thread is not
+ * Threads are created with the specified boost policy, priority, stack-size. A detachable thread
+ *is not
* joinable.
*
* By default threads are not joinable.
*/
- BoostThreadFactory(bool detached=true);
+ BoostThreadFactory(bool detached = true);
// From ThreadFactory;
boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
@@ -69,7 +71,8 @@ private:
class Impl;
boost::shared_ptr<Impl> impl_;
};
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/Exception.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Exception.h b/lib/cpp/src/thrift/concurrency/Exception.h
index c62f116..6438fda 100644
--- a/lib/cpp/src/thrift/concurrency/Exception.h
+++ b/lib/cpp/src/thrift/concurrency/Exception.h
@@ -23,7 +23,9 @@
#include <exception>
#include <thrift/Thrift.h>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
class NoSuchTaskException : public apache::thrift::TException {};
@@ -39,26 +41,24 @@ public:
class TimedOutException : public apache::thrift::TException {
public:
- TimedOutException():TException("TimedOutException"){};
- TimedOutException(const std::string& message ) :
- TException(message) {}
+ TimedOutException() : TException("TimedOutException"){};
+ TimedOutException(const std::string& message) : TException(message) {}
};
class TooManyPendingTasksException : public apache::thrift::TException {
public:
- TooManyPendingTasksException():TException("TooManyPendingTasksException"){};
- TooManyPendingTasksException(const std::string& message ) :
- TException(message) {}
+ TooManyPendingTasksException() : TException("TooManyPendingTasksException"){};
+ TooManyPendingTasksException(const std::string& message) : TException(message) {}
};
class SystemResourceException : public apache::thrift::TException {
public:
- SystemResourceException() {}
+ SystemResourceException() {}
- SystemResourceException(const std::string& message) :
- TException(message) {}
+ SystemResourceException(const std::string& message) : TException(message) {}
};
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/FunctionRunner.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/FunctionRunner.h b/lib/cpp/src/thrift/concurrency/FunctionRunner.h
index e3b2bf3..b776794 100644
--- a/lib/cpp/src/thrift/concurrency/FunctionRunner.h
+++ b/lib/cpp/src/thrift/concurrency/FunctionRunner.h
@@ -23,7 +23,9 @@
#include <thrift/cxxfunctional.h>
#include <thrift/concurrency/Thread.h>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* Convenient implementation of Runnable that will execute arbitrary callbacks.
@@ -47,9 +49,9 @@ namespace apache { namespace thrift { namespace concurrency {
*/
class FunctionRunner : public Runnable {
- public:
+public:
// This is the type of callback 'pthread_create()' expects.
- typedef void* (*PthreadFuncPtr)(void *arg);
+ typedef void* (*PthreadFuncPtr)(void* arg);
// This a fully-generic void(void) callback for custom bindings.
typedef apache::thrift::stdcxx::function<void()> VoidFunc;
@@ -63,32 +65,28 @@ class FunctionRunner : public Runnable {
return boost::shared_ptr<FunctionRunner>(new FunctionRunner(cob));
}
- static boost::shared_ptr<FunctionRunner> create(PthreadFuncPtr func,
- void* arg) {
+ static boost::shared_ptr<FunctionRunner> create(PthreadFuncPtr func, void* arg) {
return boost::shared_ptr<FunctionRunner>(new FunctionRunner(func, arg));
}
private:
- static void pthread_func_wrapper(PthreadFuncPtr func, void *arg)
- {
- //discard return value
+ static void pthread_func_wrapper(PthreadFuncPtr func, void* arg) {
+ // discard return value
func(arg);
}
+
public:
/**
* Given a 'pthread_create' style callback, this FunctionRunner will
* execute the given callback. Note that the 'void*' return value is ignored.
*/
FunctionRunner(PthreadFuncPtr func, void* arg)
- : func_(apache::thrift::stdcxx::bind(pthread_func_wrapper, func, arg))
- { }
+ : func_(apache::thrift::stdcxx::bind(pthread_func_wrapper, func, arg)) {}
/**
* Given a generic callback, this FunctionRunner will execute it.
*/
- FunctionRunner(const VoidFunc& cob)
- : func_(cob)
- { }
+ FunctionRunner(const VoidFunc& cob) : func_(cob) {}
/**
* Given a bool foo(...) type callback, FunctionRunner will execute
@@ -96,26 +94,25 @@ public:
* until it returns false. Note that the actual interval between calls will
* be intervalMs plus execution time of the callback.
*/
- FunctionRunner(const BoolFunc& cob, int intervalMs)
- : repFunc_(cob), intervalMs_(intervalMs)
- { }
+ FunctionRunner(const BoolFunc& cob, int intervalMs) : repFunc_(cob), intervalMs_(intervalMs) {}
void run() {
if (repFunc_) {
- while(repFunc_()) {
- THRIFT_SLEEP_USEC(intervalMs_*1000);
+ while (repFunc_()) {
+ THRIFT_SLEEP_USEC(intervalMs_ * 1000);
}
} else {
func_();
}
}
- private:
+private:
VoidFunc func_;
BoolFunc repFunc_;
int intervalMs_;
};
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/Monitor.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Monitor.cpp b/lib/cpp/src/thrift/concurrency/Monitor.cpp
index d94b2a4..5e713c0 100644
--- a/lib/cpp/src/thrift/concurrency/Monitor.cpp
+++ b/lib/cpp/src/thrift/concurrency/Monitor.cpp
@@ -30,7 +30,9 @@
#include <pthread.h>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
using boost::scoped_ptr;
@@ -41,26 +43,14 @@ using boost::scoped_ptr;
*/
class Monitor::Impl {
- public:
-
- Impl()
- : ownedMutex_(new Mutex()),
- mutex_(NULL),
- condInitialized_(false) {
+public:
+ Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) {
init(ownedMutex_.get());
}
- Impl(Mutex* mutex)
- : mutex_(NULL),
- condInitialized_(false) {
- init(mutex);
- }
+ Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); }
- Impl(Monitor* monitor)
- : mutex_(NULL),
- condInitialized_(false) {
- init(&(monitor->mutex()));
- }
+ Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); }
~Impl() { cleanup(); }
@@ -80,11 +70,10 @@ class Monitor::Impl {
if (result == THRIFT_ETIMEDOUT) {
// pthread_cond_timedwait has been observed to return early on
// various platforms, so comment out this assert.
- //assert(Util::currentTime() >= (now + timeout));
+ // assert(Util::currentTime() >= (now + timeout));
throw TimedOutException();
} else if (result != 0) {
- throw TException(
- "pthread_cond_wait() or pthread_cond_timedwait() failed");
+ throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed");
}
}
@@ -110,19 +99,16 @@ class Monitor::Impl {
*/
int waitForTime(const THRIFT_TIMESPEC* abstime) const {
assert(mutex_);
- pthread_mutex_t* mutexImpl =
- reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
+ pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
// XXX Need to assert that caller owns mutex
- return pthread_cond_timedwait(&pthread_cond_,
- mutexImpl,
- abstime);
+ return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime);
}
int waitForTime(const struct timeval* abstime) const {
struct THRIFT_TIMESPEC temp;
- temp.tv_sec = abstime->tv_sec;
+ temp.tv_sec = abstime->tv_sec;
temp.tv_nsec = abstime->tv_usec * 1000;
return waitForTime(&temp);
}
@@ -132,13 +118,11 @@ class Monitor::Impl {
*/
int waitForever() const {
assert(mutex_);
- pthread_mutex_t* mutexImpl =
- reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
+ pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
return pthread_cond_wait(&pthread_cond_, mutexImpl);
}
-
void notify() {
// XXX Need to assert that caller owns mutex
int iret = pthread_cond_signal(&pthread_cond_);
@@ -153,8 +137,7 @@ class Monitor::Impl {
assert(iret == 0);
}
- private:
-
+private:
void init(Mutex* mutex) {
mutex_ = mutex;
@@ -184,19 +167,32 @@ class Monitor::Impl {
mutable bool condInitialized_;
};
-Monitor::Monitor() : impl_(new Monitor::Impl()) {}
-Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
-Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}
+Monitor::Monitor() : impl_(new Monitor::Impl()) {
+}
+Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {
+}
+Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {
+}
-Monitor::~Monitor() { delete impl_; }
+Monitor::~Monitor() {
+ delete impl_;
+}
-Mutex& Monitor::mutex() const { return impl_->mutex(); }
+Mutex& Monitor::mutex() const {
+ return impl_->mutex();
+}
-void Monitor::lock() const { impl_->lock(); }
+void Monitor::lock() const {
+ impl_->lock();
+}
-void Monitor::unlock() const { impl_->unlock(); }
+void Monitor::unlock() const {
+ impl_->unlock();
+}
-void Monitor::wait(int64_t timeout) const { impl_->wait(timeout); }
+void Monitor::wait(int64_t timeout) const {
+ impl_->wait(timeout);
+}
int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
return impl_->waitForTime(abstime);
@@ -214,8 +210,13 @@ int Monitor::waitForever() const {
return impl_->waitForever();
}
-void Monitor::notify() const { impl_->notify(); }
-
-void Monitor::notifyAll() const { impl_->notifyAll(); }
+void Monitor::notify() const {
+ impl_->notify();
+}
-}}} // apache::thrift::concurrency
+void Monitor::notifyAll() const {
+ impl_->notifyAll();
+}
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/Monitor.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Monitor.h b/lib/cpp/src/thrift/concurrency/Monitor.h
index 811e0e1..5472f85 100644
--- a/lib/cpp/src/thrift/concurrency/Monitor.h
+++ b/lib/cpp/src/thrift/concurrency/Monitor.h
@@ -25,8 +25,9 @@
#include <boost/utility.hpp>
-
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* A monitor is a combination mutex and condition-event. Waiting and
@@ -47,7 +48,7 @@ namespace apache { namespace thrift { namespace concurrency {
* @version $Id:$
*/
class Monitor : boost::noncopyable {
- public:
+public:
/** Creates a new mutex, and takes ownership of it. */
Monitor();
@@ -101,30 +102,28 @@ class Monitor : boost::noncopyable {
*/
void wait(int64_t timeout_ms = 0LL) const;
-
/** Wakes up one thread waiting on this monitor. */
virtual void notify() const;
/** Wakes up all waiting threads on this monitor. */
virtual void notifyAll() const;
- private:
-
+private:
class Impl;
Impl* impl_;
};
class Synchronized {
- public:
- Synchronized(const Monitor* monitor) : g(monitor->mutex()) { }
- Synchronized(const Monitor& monitor) : g(monitor.mutex()) { }
+public:
+ Synchronized(const Monitor* monitor) : g(monitor->mutex()) {}
+ Synchronized(const Monitor& monitor) : g(monitor.mutex()) {}
- private:
+private:
Guard g;
};
-
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_MONITOR_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/Mutex.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Mutex.cpp b/lib/cpp/src/thrift/concurrency/Mutex.cpp
index 3f7bb5b..d9921aa 100644
--- a/lib/cpp/src/thrift/concurrency/Mutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/Mutex.cpp
@@ -31,7 +31,9 @@
using boost::shared_ptr;
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
#ifndef THRIFT_NO_CONTENTION_PROFILING
@@ -40,40 +42,38 @@ static MutexWaitCallback mutexProfilingCallback = 0;
volatile static sig_atomic_t mutexProfilingCounter = 0;
-void enableMutexProfiling(int32_t profilingSampleRate,
- MutexWaitCallback callback) {
+void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callback) {
mutexProfilingSampleRate = profilingSampleRate;
mutexProfilingCallback = callback;
}
-#define PROFILE_MUTEX_START_LOCK() \
- int64_t _lock_startTime = maybeGetProfilingStartTime();
+#define PROFILE_MUTEX_START_LOCK() int64_t _lock_startTime = maybeGetProfilingStartTime();
-#define PROFILE_MUTEX_NOT_LOCKED() \
- do { \
- if (_lock_startTime > 0) { \
- int64_t endTime = Util::currentTimeUsec(); \
- (*mutexProfilingCallback)(this, endTime - _lock_startTime); \
- } \
+#define PROFILE_MUTEX_NOT_LOCKED() \
+ do { \
+ if (_lock_startTime > 0) { \
+ int64_t endTime = Util::currentTimeUsec(); \
+ (*mutexProfilingCallback)(this, endTime - _lock_startTime); \
+ } \
} while (0)
-#define PROFILE_MUTEX_LOCKED() \
- do { \
- profileTime_ = _lock_startTime; \
- if (profileTime_ > 0) { \
- profileTime_ = Util::currentTimeUsec() - profileTime_; \
- } \
+#define PROFILE_MUTEX_LOCKED() \
+ do { \
+ profileTime_ = _lock_startTime; \
+ if (profileTime_ > 0) { \
+ profileTime_ = Util::currentTimeUsec() - profileTime_; \
+ } \
} while (0)
-#define PROFILE_MUTEX_START_UNLOCK() \
- int64_t _temp_profileTime = profileTime_; \
+#define PROFILE_MUTEX_START_UNLOCK() \
+ int64_t _temp_profileTime = profileTime_; \
profileTime_ = 0;
-#define PROFILE_MUTEX_UNLOCKED() \
- do { \
- if (_temp_profileTime > 0) { \
- (*mutexProfilingCallback)(this, _temp_profileTime); \
- } \
+#define PROFILE_MUTEX_UNLOCKED() \
+ do { \
+ if (_temp_profileTime > 0) { \
+ (*mutexProfilingCallback)(this, _temp_profileTime); \
+ } \
} while (0)
static inline int64_t maybeGetProfilingStartTime() {
@@ -101,11 +101,11 @@ static inline int64_t maybeGetProfilingStartTime() {
}
#else
-# define PROFILE_MUTEX_START_LOCK()
-# define PROFILE_MUTEX_NOT_LOCKED()
-# define PROFILE_MUTEX_LOCKED()
-# define PROFILE_MUTEX_START_UNLOCK()
-# define PROFILE_MUTEX_UNLOCKED()
+#define PROFILE_MUTEX_START_LOCK()
+#define PROFILE_MUTEX_NOT_LOCKED()
+#define PROFILE_MUTEX_LOCKED()
+#define PROFILE_MUTEX_START_UNLOCK()
+#define PROFILE_MUTEX_UNLOCKED()
#endif // THRIFT_NO_CONTENTION_PROFILING
/**
@@ -114,7 +114,7 @@ static inline int64_t maybeGetProfilingStartTime() {
* @version $Id:$
*/
class Mutex::impl {
- public:
+public:
impl(Initializer init) : initialized_(false) {
#ifndef THRIFT_NO_CONTENTION_PROFILING
profileTime_ = 0;
@@ -182,9 +182,9 @@ class Mutex::impl {
PROFILE_MUTEX_UNLOCKED();
}
- void* getUnderlyingImpl() const { return (void*) &pthread_mutex_; }
+ void* getUnderlyingImpl() const { return (void*)&pthread_mutex_; }
- private:
+private:
mutable pthread_mutex_t pthread_mutex_;
mutable bool initialized_;
#ifndef THRIFT_NO_CONTENTION_PROFILING
@@ -192,17 +192,28 @@ class Mutex::impl {
#endif
};
-Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {}
+Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {
+}
-void* Mutex::getUnderlyingImpl() const { return impl_->getUnderlyingImpl(); }
+void* Mutex::getUnderlyingImpl() const {
+ return impl_->getUnderlyingImpl();
+}
-void Mutex::lock() const { impl_->lock(); }
+void Mutex::lock() const {
+ impl_->lock();
+}
-bool Mutex::trylock() const { return impl_->trylock(); }
+bool Mutex::trylock() const {
+ return impl_->trylock();
+}
-bool Mutex::timedlock(int64_t ms) const { return impl_->timedlock(ms); }
+bool Mutex::timedlock(int64_t ms) const {
+ return impl_->timedlock(ms);
+}
-void Mutex::unlock() const { impl_->unlock(); }
+void Mutex::unlock() const {
+ impl_->unlock();
+}
void Mutex::DEFAULT_INITIALIZER(void* arg) {
pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg;
@@ -211,7 +222,8 @@ void Mutex::DEFAULT_INITIALIZER(void* arg) {
assert(ret == 0);
}
-#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP)
+#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) \
+ || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP)
static void init_with_kind(pthread_mutex_t* mutex, int kind) {
pthread_mutexattr_t mutexattr;
int ret = pthread_mutexattr_init(&mutexattr);
@@ -250,7 +262,6 @@ void Mutex::RECURSIVE_INITIALIZER(void* arg) {
}
#endif
-
/**
* Implementation of ReadWriteMutex class using POSIX rw lock
*
@@ -269,7 +280,7 @@ public:
}
~impl() {
- if(initialized_) {
+ if (initialized_) {
initialized_ = false;
int ret = pthread_rwlock_destroy(&rw_lock_);
THRIFT_UNUSED_VARIABLE(ret);
@@ -280,7 +291,7 @@ public:
void acquireRead() const {
PROFILE_MUTEX_START_LOCK();
pthread_rwlock_rdlock(&rw_lock_);
- PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path
+ PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path
}
void acquireWrite() const {
@@ -307,22 +318,33 @@ private:
#endif
};
-ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {}
+ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {
+}
-void ReadWriteMutex::acquireRead() const { impl_->acquireRead(); }
+void ReadWriteMutex::acquireRead() const {
+ impl_->acquireRead();
+}
-void ReadWriteMutex::acquireWrite() const { impl_->acquireWrite(); }
+void ReadWriteMutex::acquireWrite() const {
+ impl_->acquireWrite();
+}
-bool ReadWriteMutex::attemptRead() const { return impl_->attemptRead(); }
+bool ReadWriteMutex::attemptRead() const {
+ return impl_->attemptRead();
+}
-bool ReadWriteMutex::attemptWrite() const { return impl_->attemptWrite(); }
+bool ReadWriteMutex::attemptWrite() const {
+ return impl_->attemptWrite();
+}
-void ReadWriteMutex::release() const { impl_->release(); }
+void ReadWriteMutex::release() const {
+ impl_->release();
+}
-NoStarveReadWriteMutex::NoStarveReadWriteMutex() : writerWaiting_(false) {}
+NoStarveReadWriteMutex::NoStarveReadWriteMutex() : writerWaiting_(false) {
+}
-void NoStarveReadWriteMutex::acquireRead() const
-{
+void NoStarveReadWriteMutex::acquireRead() const {
if (writerWaiting_) {
// writer is waiting, block on the writer's mutex until he's done with it
mutex_.lock();
@@ -332,8 +354,7 @@ void NoStarveReadWriteMutex::acquireRead() const
ReadWriteMutex::acquireRead();
}
-void NoStarveReadWriteMutex::acquireWrite() const
-{
+void NoStarveReadWriteMutex::acquireWrite() const {
// if we can acquire the rwlock the easy way, we're done
if (attemptWrite()) {
return;
@@ -348,6 +369,6 @@ void NoStarveReadWriteMutex::acquireWrite() const
writerWaiting_ = false;
mutex_.unlock();
}
-
-}}} // apache::thrift::concurrency
-
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/Mutex.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Mutex.h b/lib/cpp/src/thrift/concurrency/Mutex.h
index 3cd8440..e3142fa 100644
--- a/lib/cpp/src/thrift/concurrency/Mutex.h
+++ b/lib/cpp/src/thrift/concurrency/Mutex.h
@@ -23,7 +23,9 @@
#include <boost/shared_ptr.hpp>
#include <boost/noncopyable.hpp>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
#ifndef THRIFT_NO_CONTENTION_PROFILING
@@ -46,8 +48,7 @@ namespace apache { namespace thrift { namespace concurrency {
* particular time period.
*/
typedef void (*MutexWaitCallback)(const void* id, int64_t waitTimeMicros);
-void enableMutexProfiling(int32_t profilingSampleRate,
- MutexWaitCallback callback);
+void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callback);
#endif
@@ -57,7 +58,7 @@ void enableMutexProfiling(int32_t profilingSampleRate,
* @version $Id:$
*/
class Mutex {
- public:
+public:
typedef void (*Initializer)(void*);
Mutex(Initializer init = DEFAULT_INITIALIZER);
@@ -73,8 +74,7 @@ class Mutex {
static void ADAPTIVE_INITIALIZER(void*);
static void RECURSIVE_INITIALIZER(void*);
- private:
-
+private:
class impl;
boost::shared_ptr<impl> impl_;
};
@@ -96,7 +96,6 @@ public:
virtual void release() const;
private:
-
class impl;
boost::shared_ptr<impl> impl_;
};
@@ -121,7 +120,7 @@ private:
};
class Guard : boost::noncopyable {
- public:
+public:
Guard(const Mutex& value, int64_t timeout = 0) : mutex_(&value) {
if (timeout == 0) {
value.lock();
@@ -141,48 +140,40 @@ class Guard : boost::noncopyable {
}
}
- operator bool() const {
- return (mutex_ != NULL);
- }
+ operator bool() const { return (mutex_ != NULL); }
- private:
+private:
const Mutex* mutex_;
};
// Can be used as second argument to RWGuard to make code more readable
// as to whether we're doing acquireRead() or acquireWrite().
-enum RWGuardType {
- RW_READ = 0,
- RW_WRITE = 1
-};
-
+enum RWGuardType { RW_READ = 0, RW_WRITE = 1 };
class RWGuard : boost::noncopyable {
- public:
- RWGuard(const ReadWriteMutex& value, bool write = false)
- : rw_mutex_(value) {
- if (write) {
- rw_mutex_.acquireWrite();
- } else {
- rw_mutex_.acquireRead();
- }
+public:
+ RWGuard(const ReadWriteMutex& value, bool write = false) : rw_mutex_(value) {
+ if (write) {
+ rw_mutex_.acquireWrite();
+ } else {
+ rw_mutex_.acquireRead();
}
+ }
- RWGuard(const ReadWriteMutex& value, RWGuardType type)
- : rw_mutex_(value) {
- if (type == RW_WRITE) {
- rw_mutex_.acquireWrite();
- } else {
- rw_mutex_.acquireRead();
- }
- }
- ~RWGuard() {
- rw_mutex_.release();
+ RWGuard(const ReadWriteMutex& value, RWGuardType type) : rw_mutex_(value) {
+ if (type == RW_WRITE) {
+ rw_mutex_.acquireWrite();
+ } else {
+ rw_mutex_.acquireRead();
}
- private:
- const ReadWriteMutex& rw_mutex_;
-};
+ }
+ ~RWGuard() { rw_mutex_.release(); }
-}}} // apache::thrift::concurrency
+private:
+ const ReadWriteMutex& rw_mutex_;
+};
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_MUTEX_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h b/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h
index 6e46dfc..311c3db 100644
--- a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h
@@ -20,6 +20,7 @@
#ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
#define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1
+// clang-format off
#include <thrift/thrift-config.h>
#if USE_BOOST_THREAD
# include <thrift/concurrency/BoostThreadFactory.h>
@@ -28,9 +29,13 @@
#else
# include <thrift/concurrency/PosixThreadFactory.h>
#endif
+// clang-format on
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
+// clang-format off
#ifdef USE_BOOST_THREAD
typedef BoostThreadFactory PlatformThreadFactory;
#elif USE_STD_THREAD
@@ -38,7 +43,10 @@ namespace apache { namespace thrift { namespace concurrency {
#else
typedef PosixThreadFactory PlatformThreadFactory;
#endif
+// clang-format on
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
index 52ceead..47c5034 100644
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
@@ -23,7 +23,7 @@
#include <thrift/concurrency/Exception.h>
#if GOOGLE_PERFTOOLS_REGISTER_THREAD
-# include <google/profiler.h>
+#include <google/profiler.h>
#endif
#include <assert.h>
@@ -33,7 +33,9 @@
#include <boost/weak_ptr.hpp>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
using boost::shared_ptr;
using boost::weak_ptr;
@@ -43,22 +45,15 @@ using boost::weak_ptr;
*
* @version $Id:$
*/
-class PthreadThread: public Thread {
- public:
-
- enum STATE {
- uninitialized,
- starting,
- started,
- stopping,
- stopped
- };
+class PthreadThread : public Thread {
+public:
+ enum STATE { uninitialized, starting, started, stopping, stopped };
static const int MB = 1024 * 1024;
static void* threadMain(void* arg);
- private:
+private:
pthread_t pthread_;
STATE state_;
int policy_;
@@ -67,19 +62,23 @@ class PthreadThread: public Thread {
weak_ptr<PthreadThread> self_;
bool detached_;
- public:
-
- PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
+public:
+ PthreadThread(int policy,
+ int priority,
+ int stackSize,
+ bool detached,
+ shared_ptr<Runnable> runnable)
+ :
#ifndef _WIN32
- pthread_(0),
+ pthread_(0),
#endif // _WIN32
- state_(uninitialized),
- policy_(policy),
- priority_(priority),
- stackSize_(stackSize),
- detached_(detached) {
+ state_(uninitialized),
+ policy_(policy),
+ priority_(priority),
+ stackSize_(stackSize),
+ detached_(detached) {
this->Thread::runnable(runnable);
}
@@ -88,10 +87,10 @@ class PthreadThread: public Thread {
/* Nothing references this thread, if is is not detached, do a join
now, otherwise the thread-id and, possibly, other resources will
be leaked. */
- if(!detached_) {
+ if (!detached_) {
try {
join();
- } catch(...) {
+ } catch (...) {
// We're really hosed.
}
}
@@ -104,14 +103,13 @@ class PthreadThread: public Thread {
pthread_attr_t thread_attr;
if (pthread_attr_init(&thread_attr) != 0) {
- throw SystemResourceException("pthread_attr_init failed");
+ throw SystemResourceException("pthread_attr_init failed");
}
- if(pthread_attr_setdetachstate(&thread_attr,
- detached_ ?
- PTHREAD_CREATE_DETACHED :
- PTHREAD_CREATE_JOINABLE) != 0) {
- throw SystemResourceException("pthread_attr_setdetachstate failed");
+ if (pthread_attr_setdetachstate(&thread_attr,
+ detached_ ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE)
+ != 0) {
+ throw SystemResourceException("pthread_attr_setdetachstate failed");
}
// Set thread stack size
@@ -119,11 +117,12 @@ class PthreadThread: public Thread {
throw SystemResourceException("pthread_attr_setstacksize failed");
}
- // Set thread policy
- #ifdef _WIN32
- //WIN32 Pthread implementation doesn't seem to support sheduling policies other then PosixThreadFactory::OTHER - runtime error
- policy_ = PosixThreadFactory::OTHER;
- #endif
+// Set thread policy
+#ifdef _WIN32
+ // WIN32 Pthread implementation doesn't seem to support sheduling policies other then
+ // PosixThreadFactory::OTHER - runtime error
+ policy_ = PosixThreadFactory::OTHER;
+#endif
if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
throw SystemResourceException("pthread_attr_setschedpolicy failed");
@@ -217,7 +216,7 @@ void* PthreadThread::threadMain(void* arg) {
*/
class PosixThreadFactory::Impl {
- private:
+private:
POLICY policy_;
PRIORITY priority_;
int stackSize_;
@@ -269,13 +268,9 @@ class PosixThreadFactory::Impl {
}
}
- public:
-
- Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
- policy_(policy),
- priority_(priority),
- stackSize_(stackSize),
- detached_(detached) {}
+public:
+ Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached)
+ : policy_(policy), priority_(priority), stackSize_(stackSize), detached_(detached) {}
/**
* Creates a new POSIX thread to run the runnable object
@@ -283,7 +278,12 @@ class PosixThreadFactory::Impl {
* @param runnable A runnable object
*/
shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
- shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
+ shared_ptr<PthreadThread> result
+ = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_),
+ toPthreadPriority(policy_, priority_),
+ stackSize_,
+ detached_,
+ runnable));
result->weakRef(result);
runnable->thread(result);
return result;
@@ -314,28 +314,47 @@ class PosixThreadFactory::Impl {
#else
return (Thread::id_t)pthread_self().p;
#endif // _WIN32
-
}
-
};
-PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
- impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
-
-shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
+PosixThreadFactory::PosixThreadFactory(POLICY policy,
+ PRIORITY priority,
+ int stackSize,
+ bool detached)
+ : impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {
+}
-int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
+shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const {
+ return impl_->newThread(runnable);
+}
-void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
+int PosixThreadFactory::getStackSize() const {
+ return impl_->getStackSize();
+}
-PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
+void PosixThreadFactory::setStackSize(int value) {
+ impl_->setStackSize(value);
+}
-void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
+PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const {
+ return impl_->getPriority();
+}
-bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
+void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) {
+ impl_->setPriority(value);
+}
-void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+bool PosixThreadFactory::isDetached() const {
+ return impl_->isDetached();
+}
-Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
+void PosixThreadFactory::setDetached(bool value) {
+ impl_->setDetached(value);
+}
-}}} // apache::thrift::concurrency
+Thread::id_t PosixThreadFactory::getCurrentThreadId() const {
+ return impl_->getCurrentThreadId();
+}
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
index 72368ca..4004231 100644
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
@@ -24,7 +24,9 @@
#include <boost/shared_ptr.hpp>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* A thread factory to create posix threads
@@ -33,16 +35,11 @@ namespace apache { namespace thrift { namespace concurrency {
*/
class PosixThreadFactory : public ThreadFactory {
- public:
-
+public:
/**
* POSIX Thread scheduler policies
*/
- enum POLICY {
- OTHER,
- FIFO,
- ROUND_ROBIN
- };
+ enum POLICY { OTHER, FIFO, ROUND_ROBIN };
/**
* POSIX Thread scheduler relative priorities,
@@ -78,7 +75,10 @@ class PosixThreadFactory : public ThreadFactory {
* By default threads are not joinable.
*/
- PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=true);
+ PosixThreadFactory(POLICY policy = ROUND_ROBIN,
+ PRIORITY priority = NORMAL,
+ int stackSize = 1,
+ bool detached = true);
// From ThreadFactory;
boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
@@ -120,11 +120,12 @@ class PosixThreadFactory : public ThreadFactory {
*/
virtual bool isDetached() const;
- private:
+private:
class Impl;
boost::shared_ptr<Impl> impl_;
};
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/StdMonitor.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/StdMonitor.cpp b/lib/cpp/src/thrift/concurrency/StdMonitor.cpp
index cf257e6..7b3b209 100644
--- a/lib/cpp/src/thrift/concurrency/StdMonitor.cpp
+++ b/lib/cpp/src/thrift/concurrency/StdMonitor.cpp
@@ -30,7 +30,9 @@
#include <thread>
#include <mutex>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* Monitor implementation using the std thread library
@@ -39,26 +41,12 @@ namespace apache { namespace thrift { namespace concurrency {
*/
class Monitor::Impl {
- public:
+public:
+ Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); }
- Impl()
- : ownedMutex_(new Mutex()),
- conditionVariable_(),
- mutex_(NULL) {
- init(ownedMutex_.get());
- }
-
- Impl(Mutex* mutex)
- : ownedMutex_(),
- conditionVariable_(),
- mutex_(NULL) {
- init(mutex);
- }
+ Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); }
- Impl(Monitor* monitor)
- : ownedMutex_(),
- conditionVariable_(),
- mutex_(NULL) {
+ Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) {
init(&(monitor->mutex()));
}
@@ -78,8 +66,7 @@ class Monitor::Impl {
if (result == THRIFT_ETIMEDOUT) {
throw TimedOutException();
} else if (result != 0) {
- throw TException(
- "Monitor::wait() failed");
+ throw TException("Monitor::wait() failed");
}
}
@@ -95,12 +82,12 @@ class Monitor::Impl {
}
assert(mutex_);
- std::timed_mutex* mutexImpl =
- static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
- bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms)) == std::cv_status::timeout);
+ bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms))
+ == std::cv_status::timeout);
lock.release();
return (timedout ? THRIFT_ETIMEDOUT : 0);
}
@@ -111,7 +98,7 @@ class Monitor::Impl {
*/
int waitForTime(const THRIFT_TIMESPEC* abstime) {
struct timeval temp;
- temp.tv_sec = static_cast<long>(abstime->tv_sec);
+ temp.tv_sec = static_cast<long>(abstime->tv_sec);
temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000;
return waitForTime(&temp);
}
@@ -122,24 +109,24 @@ class Monitor::Impl {
*/
int waitForTime(const struct timeval* abstime) {
assert(mutex_);
- std::timed_mutex* mutexImpl =
- static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
struct timeval currenttime;
Util::toTimeval(currenttime, Util::currentTime());
- long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
+ long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec);
- if(tv_sec < 0)
+ if (tv_sec < 0)
tv_sec = 0;
- if(tv_usec < 0)
+ if (tv_usec < 0)
tv_usec = 0;
std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
bool timedout = (conditionVariable_.wait_for(lock,
- std::chrono::seconds(tv_sec) +
- std::chrono::microseconds(tv_usec)) == std::cv_status::timeout);
+ std::chrono::seconds(tv_sec)
+ + std::chrono::microseconds(tv_usec))
+ == std::cv_status::timeout);
lock.release();
return (timedout ? THRIFT_ETIMEDOUT : 0);
}
@@ -150,8 +137,7 @@ class Monitor::Impl {
*/
int waitForever() {
assert(mutex_);
- std::timed_mutex* mutexImpl =
- static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
@@ -160,39 +146,44 @@ class Monitor::Impl {
return 0;
}
+ void notify() { conditionVariable_.notify_one(); }
- void notify() {
- conditionVariable_.notify_one();
- }
-
- void notifyAll() {
- conditionVariable_.notify_all();
- }
+ void notifyAll() { conditionVariable_.notify_all(); }
- private:
-
- void init(Mutex* mutex) {
- mutex_ = mutex;
- }
+private:
+ void init(Mutex* mutex) { mutex_ = mutex; }
const std::unique_ptr<Mutex> ownedMutex_;
std::condition_variable_any conditionVariable_;
Mutex* mutex_;
};
-Monitor::Monitor() : impl_(new Monitor::Impl()) {}
-Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
-Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}
+Monitor::Monitor() : impl_(new Monitor::Impl()) {
+}
+Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {
+}
+Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {
+}
-Monitor::~Monitor() { delete impl_; }
+Monitor::~Monitor() {
+ delete impl_;
+}
-Mutex& Monitor::mutex() const { return const_cast<Monitor::Impl*>(impl_)->mutex(); }
+Mutex& Monitor::mutex() const {
+ return const_cast<Monitor::Impl*>(impl_)->mutex();
+}
-void Monitor::lock() const { const_cast<Monitor::Impl*>(impl_)->lock(); }
+void Monitor::lock() const {
+ const_cast<Monitor::Impl*>(impl_)->lock();
+}
-void Monitor::unlock() const { const_cast<Monitor::Impl*>(impl_)->unlock(); }
+void Monitor::unlock() const {
+ const_cast<Monitor::Impl*>(impl_)->unlock();
+}
-void Monitor::wait(int64_t timeout) const { const_cast<Monitor::Impl*>(impl_)->wait(timeout); }
+void Monitor::wait(int64_t timeout) const {
+ const_cast<Monitor::Impl*>(impl_)->wait(timeout);
+}
int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
@@ -210,8 +201,13 @@ int Monitor::waitForever() const {
return const_cast<Monitor::Impl*>(impl_)->waitForever();
}
-void Monitor::notify() const { const_cast<Monitor::Impl*>(impl_)->notify(); }
-
-void Monitor::notifyAll() const { const_cast<Monitor::Impl*>(impl_)->notifyAll(); }
+void Monitor::notify() const {
+ const_cast<Monitor::Impl*>(impl_)->notify();
+}
-}}} // apache::thrift::concurrency
+void Monitor::notifyAll() const {
+ const_cast<Monitor::Impl*>(impl_)->notifyAll();
+}
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/StdMutex.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/StdMutex.cpp b/lib/cpp/src/thrift/concurrency/StdMutex.cpp
index 28f889a..69678a2 100644
--- a/lib/cpp/src/thrift/concurrency/StdMutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/StdMutex.cpp
@@ -26,30 +26,42 @@
#include <chrono>
#include <mutex>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* Implementation of Mutex class using C++11 std::timed_mutex
*
* @version $Id:$
*/
-class Mutex::impl : public std::timed_mutex {
-};
+class Mutex::impl : public std::timed_mutex {};
-Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) {}
+Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) {
+}
-void* Mutex::getUnderlyingImpl() const { return impl_.get(); }
+void* Mutex::getUnderlyingImpl() const {
+ return impl_.get();
+}
-void Mutex::lock() const { impl_->lock(); }
+void Mutex::lock() const {
+ impl_->lock();
+}
-bool Mutex::trylock() const { return impl_->try_lock(); }
+bool Mutex::trylock() const {
+ return impl_->try_lock();
+}
-bool Mutex::timedlock(int64_t ms) const { return impl_->try_lock_for(std::chrono::milliseconds(ms)); }
+bool Mutex::timedlock(int64_t ms) const {
+ return impl_->try_lock_for(std::chrono::milliseconds(ms));
+}
-void Mutex::unlock() const { impl_->unlock(); }
+void Mutex::unlock() const {
+ impl_->unlock();
+}
void Mutex::DEFAULT_INITIALIZER(void* arg) {
}
-
-}}} // apache::thrift::concurrency
-
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
index 6014b32..1ff4e73 100644
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
@@ -30,7 +30,9 @@
#include <boost/weak_ptr.hpp>
#include <thread>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* The C++11 thread class.
@@ -41,37 +43,28 @@ namespace apache { namespace thrift { namespace concurrency {
*
* @version $Id:$
*/
-class StdThread: public Thread, public boost::enable_shared_from_this<StdThread> {
- public:
+class StdThread : public Thread, public boost::enable_shared_from_this<StdThread> {
+public:
+ enum STATE { uninitialized, starting, started, stopping, stopped };
- enum STATE {
- uninitialized,
- starting,
- started,
- stopping,
- stopped
- };
+ static void threadMain(boost::shared_ptr<StdThread> thread);
- static void threadMain(boost::shared_ptr<StdThread> thread);
-
- private:
+private:
std::unique_ptr<std::thread> thread_;
STATE state_;
bool detached_;
- public:
-
- StdThread(bool detached, boost::shared_ptr<Runnable> runnable) :
- state_(uninitialized),
- detached_(detached) {
+public:
+ StdThread(bool detached, boost::shared_ptr<Runnable> runnable)
+ : state_(uninitialized), detached_(detached) {
this->Thread::runnable(runnable);
}
~StdThread() {
- if(!detached_) {
+ if (!detached_) {
try {
join();
- } catch(...) {
+ } catch (...) {
// We're really hosed.
}
}
@@ -87,7 +80,7 @@ class StdThread: public Thread, public boost::enable_shared_from_this<StdThread>
thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef));
- if(detached_)
+ if (detached_)
thread_->detach();
}
@@ -97,9 +90,7 @@ class StdThread: public Thread, public boost::enable_shared_from_this<StdThread>
}
}
- Thread::id_t getId() {
- return thread_.get() ? thread_->get_id() : std::thread::id();
- }
+ Thread::id_t getId() { return thread_.get() ? thread_->get_id() : std::thread::id(); }
boost::shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
@@ -130,13 +121,11 @@ void StdThread::threadMain(boost::shared_ptr<StdThread> thread) {
*/
class StdThreadFactory::Impl {
- private:
+private:
bool detached_;
- public:
-
- Impl(bool detached) :
- detached_(detached) {}
+public:
+ Impl(bool detached) : detached_(detached) {}
/**
* Creates a new std::thread to run the runnable object
@@ -144,7 +133,8 @@ class StdThreadFactory::Impl {
* @param runnable A runnable object
*/
boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const {
- boost::shared_ptr<StdThread> result = boost::shared_ptr<StdThread>(new StdThread(detached_, runnable));
+ boost::shared_ptr<StdThread> result
+ = boost::shared_ptr<StdThread>(new StdThread(detached_, runnable));
runnable->thread(result);
return result;
}
@@ -153,23 +143,29 @@ class StdThreadFactory::Impl {
void setDetached(bool value) { detached_ = value; }
- Thread::id_t getCurrentThreadId() const {
- return std::this_thread::get_id();
- }
-
+ Thread::id_t getCurrentThreadId() const { return std::this_thread::get_id(); }
};
-StdThreadFactory::StdThreadFactory(bool detached) :
- impl_(new StdThreadFactory::Impl(detached)) {}
-
-boost::shared_ptr<Thread> StdThreadFactory::newThread(boost::shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
+StdThreadFactory::StdThreadFactory(bool detached) : impl_(new StdThreadFactory::Impl(detached)) {
+}
-bool StdThreadFactory::isDetached() const { return impl_->isDetached(); }
+boost::shared_ptr<Thread> StdThreadFactory::newThread(boost::shared_ptr<Runnable> runnable) const {
+ return impl_->newThread(runnable);
+}
-void StdThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+bool StdThreadFactory::isDetached() const {
+ return impl_->isDetached();
+}
-Thread::id_t StdThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
+void StdThreadFactory::setDetached(bool value) {
+ impl_->setDetached(value);
+}
-}}} // apache::thrift::concurrency
+Thread::id_t StdThreadFactory::getCurrentThreadId() const {
+ return impl_->getCurrentThreadId();
+}
+}
+}
+} // apache::thrift::concurrency
#endif // USE_STD_THREAD
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
index 307f970..fb86bbf 100644
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
@@ -24,7 +24,9 @@
#include <boost/shared_ptr.hpp>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* A thread factory to create std::threads.
@@ -33,8 +35,7 @@ namespace apache { namespace thrift { namespace concurrency {
*/
class StdThreadFactory : public ThreadFactory {
- public:
-
+public:
/**
* Std thread factory. All threads created by a factory are reference-counted
* via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
@@ -44,7 +45,7 @@ class StdThreadFactory : public ThreadFactory {
* By default threads are not joinable.
*/
- StdThreadFactory(bool detached=true);
+ StdThreadFactory(bool detached = true);
// From ThreadFactory;
boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
@@ -66,7 +67,8 @@ private:
class Impl;
boost::shared_ptr<Impl> impl_;
};
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/Thread.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h
old mode 100755
new mode 100644
index 7012933..1d9153f
--- a/lib/cpp/src/thrift/concurrency/Thread.h
+++ b/lib/cpp/src/thrift/concurrency/Thread.h
@@ -27,16 +27,18 @@
#include <thrift/thrift-config.h>
#if USE_BOOST_THREAD
-# include <boost/thread.hpp>
+#include <boost/thread.hpp>
#elif USE_STD_THREAD
-# include <thread>
+#include <thread>
#else
-# ifdef HAVE_PTHREAD_H
-# include <pthread.h>
-# endif
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
#endif
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
class Thread;
@@ -47,8 +49,8 @@ class Thread;
*/
class Runnable {
- public:
- virtual ~Runnable() {};
+public:
+ virtual ~Runnable(){};
virtual void run() = 0;
/**
@@ -63,7 +65,7 @@ class Runnable {
*/
virtual void thread(boost::shared_ptr<Thread> value) { thread_ = value; }
- private:
+private:
boost::weak_ptr<Thread> thread_;
};
@@ -78,8 +80,7 @@ class Runnable {
*/
class Thread {
- public:
-
+public:
#if USE_BOOST_THREAD
typedef boost::thread::id id_t;
@@ -97,7 +98,7 @@ class Thread {
static inline id_t get_current() { return pthread_self(); }
#endif
- virtual ~Thread() {};
+ virtual ~Thread(){};
/**
* Starts the thread. Does platform specific thread creation and
@@ -122,12 +123,11 @@ class Thread {
*/
virtual boost::shared_ptr<Runnable> runnable() const { return _runnable; }
- protected:
+protected:
virtual void runnable(boost::shared_ptr<Runnable> value) { _runnable = value; }
- private:
+private:
boost::shared_ptr<Runnable> _runnable;
-
};
/**
@@ -136,17 +136,19 @@ class Thread {
*/
class ThreadFactory {
- public:
+public:
virtual ~ThreadFactory() {}
virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
- /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread */
+ /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
+ */
static const Thread::id_t unknown_thread_id;
virtual Thread::id_t getCurrentThreadId() const = 0;
};
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_THREAD_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
index 204d5dc..9ff2c9a 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -32,9 +32,11 @@
#if defined(DEBUG)
#include <iostream>
-#endif //defined(DEBUG)
+#endif // defined(DEBUG)
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
using boost::shared_ptr;
using boost::dynamic_pointer_cast;
@@ -49,18 +51,18 @@ using boost::dynamic_pointer_cast;
*
* @version $Id:$
*/
-class ThreadManager::Impl : public ThreadManager {
-
- public:
- Impl() :
- workerCount_(0),
- workerMaxCount_(0),
- idleCount_(0),
- pendingTaskCountMax_(0),
- expiredCount_(0),
- state_(ThreadManager::UNINITIALIZED),
- monitor_(&mutex_),
- maxMonitor_(&mutex_) {}
+class ThreadManager::Impl : public ThreadManager {
+
+public:
+ Impl()
+ : workerCount_(0),
+ workerMaxCount_(0),
+ idleCount_(0),
+ pendingTaskCountMax_(0),
+ expiredCount_(0),
+ state_(ThreadManager::UNINITIALIZED),
+ monitor_(&mutex_),
+ maxMonitor_(&mutex_) {}
~Impl() { stop(); }
@@ -70,9 +72,7 @@ class ThreadManager::Impl : public ThreadManager {
void join() { stopImpl(true); }
- ThreadManager::STATE state() const {
- return state_;
- }
+ ThreadManager::STATE state() const { return state_; }
shared_ptr<ThreadFactory> threadFactory() const {
Synchronized s(monitor_);
@@ -88,9 +88,7 @@ class ThreadManager::Impl : public ThreadManager {
void removeWorker(size_t value);
- size_t idleWorkerCount() const {
- return idleCount_;
- }
+ size_t idleWorkerCount() const { return idleCount_; }
size_t workerCount() const {
Synchronized s(monitor_);
@@ -149,7 +147,6 @@ private:
ThreadManager::STATE state_;
shared_ptr<ThreadFactory> threadFactory_;
-
friend class ThreadManager::Task;
std::queue<shared_ptr<Task> > tasks_;
Mutex mutex_;
@@ -165,18 +162,13 @@ private:
class ThreadManager::Task : public Runnable {
- public:
- enum STATE {
- WAITING,
- EXECUTING,
- CANCELLED,
- COMPLETE
- };
+public:
+ enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
- Task(shared_ptr<Runnable> runnable, int64_t expiration=0LL) :
- runnable_(runnable),
- state_(WAITING),
- expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
+ Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL)
+ : runnable_(runnable),
+ state_(WAITING),
+ expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
~Task() {}
@@ -187,46 +179,32 @@ class ThreadManager::Task : public Runnable {
}
}
- shared_ptr<Runnable> getRunnable() {
- return runnable_;
- }
+ shared_ptr<Runnable> getRunnable() { return runnable_; }
- int64_t getExpireTime() const {
- return expireTime_;
- }
+ int64_t getExpireTime() const { return expireTime_; }
- private:
+private:
shared_ptr<Runnable> runnable_;
friend class ThreadManager::Worker;
STATE state_;
int64_t expireTime_;
};
-class ThreadManager::Worker: public Runnable {
- enum STATE {
- UNINITIALIZED,
- STARTING,
- STARTED,
- STOPPING,
- STOPPED
- };
-
- public:
- Worker(ThreadManager::Impl* manager) :
- manager_(manager),
- state_(UNINITIALIZED),
- idle_(false) {}
+class ThreadManager::Worker : public Runnable {
+ enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
+
+public:
+ Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED), idle_(false) {}
~Worker() {}
- private:
+private:
bool isActive() const {
- return
- (manager_->workerCount_ <= manager_->workerMaxCount_) ||
- (manager_->state_ == JOINING && !manager_->tasks_.empty());
+ return (manager_->workerCount_ <= manager_->workerMaxCount_)
+ || (manager_->state_ == JOINING && !manager_->tasks_.empty());
}
- public:
+public:
/**
* Worker entry point
*
@@ -296,8 +274,8 @@ class ThreadManager::Worker: public Runnable {
/* If we have a pending task max and we just dropped below it, wakeup any
thread that might be blocked on add. */
- if (manager_->pendingTaskCountMax_ != 0 &&
- manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
+ if (manager_->pendingTaskCountMax_ != 0
+ && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
manager_->maxMonitor_.notify();
}
}
@@ -312,7 +290,7 @@ class ThreadManager::Worker: public Runnable {
if (task->state_ == ThreadManager::Task::EXECUTING) {
try {
task->run();
- } catch(...) {
+ } catch (...) {
// XXX need to log this
}
}
@@ -330,18 +308,18 @@ class ThreadManager::Worker: public Runnable {
return;
}
- private:
- ThreadManager::Impl* manager_;
- friend class ThreadManager::Impl;
- STATE state_;
- bool idle_;
+private:
+ ThreadManager::Impl* manager_;
+ friend class ThreadManager::Impl;
+ STATE state_;
+ bool idle_;
};
-
- void ThreadManager::Impl::addWorker(size_t value) {
+void ThreadManager::Impl::addWorker(size_t value) {
std::set<shared_ptr<Thread> > newThreads;
for (size_t ix = 0; ix < value; ix++) {
- shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
+ shared_ptr<ThreadManager::Worker> worker
+ = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
newThreads.insert(threadFactory_->newThread(worker));
}
@@ -351,8 +329,10 @@ class ThreadManager::Worker: public Runnable {
workers_.insert(newThreads.begin(), newThreads.end());
}
- for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
- shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
+ for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end();
+ ix++) {
+ shared_ptr<ThreadManager::Worker> worker
+ = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
worker->state_ = ThreadManager::Worker::STARTING;
(*ix)->start();
idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
@@ -396,9 +376,8 @@ void ThreadManager::Impl::stopImpl(bool join) {
{
Synchronized s(monitor_);
- if (state_ != ThreadManager::STOPPING &&
- state_ != ThreadManager::JOINING &&
- state_ != ThreadManager::STOPPED) {
+ if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
+ && state_ != ThreadManager::STOPPED) {
doStop = true;
state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
}
@@ -416,7 +395,6 @@ void ThreadManager::Impl::stopImpl(bool join) {
Synchronized s(monitor_);
state_ = ThreadManager::STOPPED;
}
-
}
void ThreadManager::Impl::removeWorker(size_t value) {
@@ -445,7 +423,9 @@ void ThreadManager::Impl::removeWorker(size_t value) {
workerMonitor_.wait();
}
- for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
+ for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin();
+ ix != deadWorkers_.end();
+ ix++) {
idMap_.erase((*ix)->getId());
workers_.erase(*ix);
}
@@ -454,60 +434,61 @@ void ThreadManager::Impl::removeWorker(size_t value) {
}
}
- bool ThreadManager::Impl::canSleep() {
- const Thread::id_t id = threadFactory_->getCurrentThreadId();
- return idMap_.find(id) == idMap_.end();
- }
+bool ThreadManager::Impl::canSleep() {
+ const Thread::id_t id = threadFactory_->getCurrentThreadId();
+ return idMap_.find(id) == idMap_.end();
+}
- void ThreadManager::Impl::add(shared_ptr<Runnable> value,
- int64_t timeout,
- int64_t expiration) {
- Guard g(mutex_, timeout);
+void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
+ Guard g(mutex_, timeout);
- if (!g) {
- throw TimedOutException();
- }
+ if (!g) {
+ throw TimedOutException();
+ }
- if (state_ != ThreadManager::STARTED) {
- throw IllegalStateException("ThreadManager::Impl::add ThreadManager "
- "not started");
- }
+ if (state_ != ThreadManager::STARTED) {
+ throw IllegalStateException(
+ "ThreadManager::Impl::add ThreadManager "
+ "not started");
+ }
- removeExpiredTasks();
- if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
- if (canSleep() && timeout >= 0) {
- while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
- // This is thread safe because the mutex is shared between monitors.
- maxMonitor_.wait(timeout);
- }
- } else {
- throw TooManyPendingTasksException();
+ removeExpiredTasks();
+ if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
+ if (canSleep() && timeout >= 0) {
+ while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
+ // This is thread safe because the mutex is shared between monitors.
+ maxMonitor_.wait(timeout);
}
+ } else {
+ throw TooManyPendingTasksException();
}
+ }
- tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
+ tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
- // If idle thread is available notify it, otherwise all worker threads are
- // running and will get around to this task in time.
- if (idleCount_ > 0) {
- monitor_.notify();
- }
+ // If idle thread is available notify it, otherwise all worker threads are
+ // running and will get around to this task in time.
+ if (idleCount_ > 0) {
+ monitor_.notify();
}
+}
void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
- (void) task;
+ (void)task;
Synchronized s(monitor_);
if (state_ != ThreadManager::STARTED) {
- throw IllegalStateException("ThreadManager::Impl::remove ThreadManager not "
- "started");
+ throw IllegalStateException(
+ "ThreadManager::Impl::remove ThreadManager not "
+ "started");
}
}
boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
Guard g(mutex_);
if (state_ != ThreadManager::STARTED) {
- throw IllegalStateException("ThreadManager::Impl::removeNextPending "
- "ThreadManager not started");
+ throw IllegalStateException(
+ "ThreadManager::Impl::removeNextPending "
+ "ThreadManager not started");
}
if (tasks_.empty()) {
@@ -543,18 +524,15 @@ void ThreadManager::Impl::removeExpiredTasks() {
}
}
-
void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
expireCallback_ = expireCallback;
}
class SimpleThreadManager : public ThreadManager::Impl {
- public:
- SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
- workerCount_(workerCount),
- pendingTaskCountMax_(pendingTaskCountMax) {
- }
+public:
+ SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
+ : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
void start() {
ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
@@ -562,20 +540,20 @@ class SimpleThreadManager : public ThreadManager::Impl {
addWorker(workerCount_);
}
- private:
+private:
const size_t workerCount_;
const size_t pendingTaskCountMax_;
Monitor monitor_;
};
-
shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
return shared_ptr<ThreadManager>(new ThreadManager::Impl());
}
-shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
+ size_t pendingTaskCountMax) {
return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
}
-
-}}} // apache::thrift::concurrency
-
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/ThreadManager.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.h b/lib/cpp/src/thrift/concurrency/ThreadManager.h
index 0fedc88..7bb71d1 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.h
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.h
@@ -25,7 +25,9 @@
#include <sys/types.h>
#include <thrift/concurrency/Thread.h>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* Thread Pool Manager and related classes
@@ -53,10 +55,10 @@ class ThreadManager;
*/
class ThreadManager {
- protected:
+protected:
ThreadManager() {}
- public:
+public:
typedef apache::thrift::stdcxx::function<void(boost::shared_ptr<Runnable>)> ExpireCallback;
virtual ~ThreadManager() {}
@@ -83,14 +85,7 @@ class ThreadManager {
*/
virtual void join() = 0;
- enum STATE {
- UNINITIALIZED,
- STARTING,
- STARTED,
- JOINING,
- STOPPING,
- STOPPED
- };
+ enum STATE { UNINITIALIZED, STARTING, STARTED, JOINING, STOPPING, STOPPED };
virtual STATE state() const = 0;
@@ -98,9 +93,9 @@ class ThreadManager {
virtual void threadFactory(boost::shared_ptr<ThreadFactory> value) = 0;
- virtual void addWorker(size_t value=1) = 0;
+ virtual void addWorker(size_t value = 1) = 0;
- virtual void removeWorker(size_t value=1) = 0;
+ virtual void removeWorker(size_t value = 1) = 0;
/**
* Gets the current number of idle worker threads
@@ -115,7 +110,7 @@ class ThreadManager {
/**
* Gets the current number of pending tasks
*/
- virtual size_t pendingTaskCount() const = 0;
+ virtual size_t pendingTaskCount() const = 0;
/**
* Gets the current number of pending and executing tasks
@@ -151,9 +146,9 @@ class ThreadManager {
*
* @throws TooManyPendingTasksException Pending task count exceeds max pending task count
*/
- virtual void add(boost::shared_ptr<Runnable>task,
- int64_t timeout=0LL,
- int64_t expiration=0LL) = 0;
+ virtual void add(boost::shared_ptr<Runnable> task,
+ int64_t timeout = 0LL,
+ int64_t expiration = 0LL) = 0;
/**
* Removes a pending task
@@ -187,7 +182,8 @@ class ThreadManager {
* a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit
* on pending tasks
*/
- static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4, size_t pendingTaskCountMax=0);
+ static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count = 4,
+ size_t pendingTaskCountMax = 0);
class Task;
@@ -195,7 +191,8 @@ class ThreadManager {
class Impl;
};
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/TimerManager.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.cpp b/lib/cpp/src/thrift/concurrency/TimerManager.cpp
index 6821b2e..60b8c85 100644
--- a/lib/cpp/src/thrift/concurrency/TimerManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/TimerManager.cpp
@@ -25,7 +25,9 @@
#include <iostream>
#include <set>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
using boost::shared_ptr;
@@ -36,20 +38,12 @@ using boost::shared_ptr;
*/
class TimerManager::Task : public Runnable {
- public:
- enum STATE {
- WAITING,
- EXECUTING,
- CANCELLED,
- COMPLETE
- };
+public:
+ enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
- Task(shared_ptr<Runnable> runnable) :
- runnable_(runnable),
- state_(WAITING) {}
+ Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
- ~Task() {
- }
+ ~Task() {}
void run() {
if (state_ == EXECUTING) {
@@ -58,17 +52,16 @@ class TimerManager::Task : public Runnable {
}
}
- private:
+private:
shared_ptr<Runnable> runnable_;
friend class TimerManager::Dispatcher;
STATE state_;
};
-class TimerManager::Dispatcher: public Runnable {
+class TimerManager::Dispatcher : public Runnable {
- public:
- Dispatcher(TimerManager* manager) :
- manager_(manager) {}
+public:
+ Dispatcher(TimerManager* manager) : manager_(manager) {}
~Dispatcher() {}
@@ -93,16 +86,19 @@ class TimerManager::Dispatcher: public Runnable {
Synchronized s(manager_->monitor_);
task_iterator expiredTaskEnd;
int64_t now = Util::currentTime();
- while (manager_->state_ == TimerManager::STARTED &&
- (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
+ while (manager_->state_ == TimerManager::STARTED
+ && (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
+ == manager_->taskMap_.begin()) {
int64_t timeout = 0LL;
if (!manager_->taskMap_.empty()) {
timeout = manager_->taskMap_.begin()->first - now;
}
- assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
+ assert((timeout != 0 && manager_->taskCount_ > 0)
+ || (timeout == 0 && manager_->taskCount_ == 0));
try {
manager_->monitor_.wait(timeout);
- } catch (TimedOutException &) {}
+ } catch (TimedOutException&) {
+ }
now = Util::currentTime();
}
@@ -119,7 +115,9 @@ class TimerManager::Dispatcher: public Runnable {
}
}
- for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
+ for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin();
+ ix != expiredTasks.end();
+ ix++) {
(*ix)->run();
}
@@ -135,20 +133,20 @@ class TimerManager::Dispatcher: public Runnable {
return;
}
- private:
+private:
TimerManager* manager_;
friend class TimerManager;
};
#if defined(_MSC_VER)
#pragma warning(push)
-#pragma warning(disable: 4355) // 'this' used in base member initializer list
+#pragma warning(disable : 4355) // 'this' used in base member initializer list
#endif
-TimerManager::TimerManager() :
- taskCount_(0),
- state_(TimerManager::UNINITIALIZED),
- dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
+TimerManager::TimerManager()
+ : taskCount_(0),
+ state_(TimerManager::UNINITIALIZED),
+ dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
}
#if defined(_MSC_VER)
@@ -163,7 +161,7 @@ TimerManager::~TimerManager() {
if (state_ != STOPPED) {
try {
stop();
- } catch(...) {
+ } catch (...) {
throw;
// uhoh
}
@@ -203,7 +201,7 @@ void TimerManager::stop() {
Synchronized s(monitor_);
if (state_ == TimerManager::UNINITIALIZED) {
state_ = TimerManager::STOPPED;
- } else if (state_ != STOPPING && state_ != STOPPED) {
+ } else if (state_ != STOPPING && state_ != STOPPED) {
doStop = true;
state_ = STOPPING;
monitor_.notifyAll();
@@ -227,7 +225,7 @@ shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
return threadFactory_;
}
-void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
+void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Synchronized s(monitor_);
threadFactory_ = value;
}
@@ -252,7 +250,8 @@ void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first;
taskCount_++;
- taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
+ taskMap_.insert(
+ std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
// If the task map was empty, or if we have an expiration that is earlier
// than any previously seen, kick the dispatcher so it can update its
@@ -271,7 +270,7 @@ void TimerManager::add(shared_ptr<Runnable> task, const struct THRIFT_TIMESPEC&
int64_t now = Util::currentTime();
if (expiration < now) {
- throw InvalidArgumentException();
+ throw InvalidArgumentException();
}
add(task, expiration - now);
@@ -285,21 +284,23 @@ void TimerManager::add(shared_ptr<Runnable> task, const struct timeval& value) {
int64_t now = Util::currentTime();
if (expiration < now) {
- throw InvalidArgumentException();
+ throw InvalidArgumentException();
}
add(task, expiration - now);
}
void TimerManager::remove(shared_ptr<Runnable> task) {
- (void) task;
+ (void)task;
Synchronized s(monitor_);
if (state_ != TimerManager::STARTED) {
throw IllegalStateException();
}
}
-TimerManager::STATE TimerManager::state() const { return state_; }
-
-}}} // apache::thrift::concurrency
-
+TimerManager::STATE TimerManager::state() const {
+ return state_;
+}
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/TimerManager.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.h b/lib/cpp/src/thrift/concurrency/TimerManager.h
index d8200cb..3946827 100644
--- a/lib/cpp/src/thrift/concurrency/TimerManager.h
+++ b/lib/cpp/src/thrift/concurrency/TimerManager.h
@@ -28,7 +28,9 @@
#include <map>
#include <time.h>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* Timer Manager
@@ -39,8 +41,7 @@ namespace apache { namespace thrift { namespace concurrency {
*/
class TimerManager {
- public:
-
+public:
TimerManager();
virtual ~TimerManager();
@@ -61,7 +62,7 @@ class TimerManager {
*/
virtual void stop();
- virtual size_t taskCount() const ;
+ virtual size_t taskCount() const;
/**
* Adds a task to be executed at some time in the future by a worker thread.
@@ -99,17 +100,11 @@ class TimerManager {
*/
virtual void remove(boost::shared_ptr<Runnable> task);
- enum STATE {
- UNINITIALIZED,
- STARTING,
- STARTED,
- STOPPING,
- STOPPED
- };
+ enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
virtual STATE state() const;
- private:
+private:
boost::shared_ptr<const ThreadFactory> threadFactory_;
class Task;
friend class Task;
@@ -124,7 +119,8 @@ class TimerManager {
typedef std::multimap<int64_t, boost::shared_ptr<TimerManager::Task> >::iterator task_iterator;
typedef std::pair<task_iterator, task_iterator> task_range;
};
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/Util.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Util.cpp b/lib/cpp/src/thrift/concurrency/Util.cpp
index 7d9085e..dd6d19f 100644
--- a/lib/cpp/src/thrift/concurrency/Util.cpp
+++ b/lib/cpp/src/thrift/concurrency/Util.cpp
@@ -26,16 +26,19 @@
#include <sys/time.h>
#endif
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
int64_t Util::currentTimeTicks(int64_t ticksPerSec) {
int64_t result;
struct timeval now;
int ret = THRIFT_GETTIMEOFDAY(&now, NULL);
assert(ret == 0);
- THRIFT_UNUSED_VARIABLE(ret); //squelching "unused variable" warning
+ THRIFT_UNUSED_VARIABLE(ret); // squelching "unused variable" warning
toTicks(result, now, ticksPerSec);
return result;
}
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/concurrency/Util.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/Util.h b/lib/cpp/src/thrift/concurrency/Util.h
index 63d80a2..ba070b6 100644
--- a/lib/cpp/src/thrift/concurrency/Util.h
+++ b/lib/cpp/src/thrift/concurrency/Util.h
@@ -31,7 +31,9 @@
#include <thrift/transport/PlatformSocket.h>
-namespace apache { namespace thrift { namespace concurrency {
+namespace apache {
+namespace thrift {
+namespace concurrency {
/**
* Utility methods
@@ -55,8 +57,7 @@ class Util {
static const int64_t NS_PER_US = NS_PER_S / US_PER_S;
static const int64_t US_PER_MS = US_PER_S / MS_PER_S;
- public:
-
+public:
/**
* Converts millisecond timestamp into a THRIFT_TIMESPEC struct
*
@@ -64,17 +65,20 @@ class Util {
* @param time or duration in milliseconds
*/
static void toTimespec(struct THRIFT_TIMESPEC& result, int64_t value) {
- result.tv_sec = value / MS_PER_S; // ms to s
+ result.tv_sec = value / MS_PER_S; // ms to s
result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns
}
static void toTimeval(struct timeval& result, int64_t value) {
- result.tv_sec = static_cast<uint32_t>(value / MS_PER_S); // ms to s
+ result.tv_sec = static_cast<uint32_t>(value / MS_PER_S); // ms to s
result.tv_usec = static_cast<uint32_t>((value % MS_PER_S) * US_PER_MS); // ms to us
}
- static void toTicks(int64_t& result, int64_t secs, int64_t oldTicks,
- int64_t oldTicksPerSec, int64_t newTicksPerSec) {
+ static void toTicks(int64_t& result,
+ int64_t secs,
+ int64_t oldTicks,
+ int64_t oldTicksPerSec,
+ int64_t newTicksPerSec) {
result = secs * newTicksPerSec;
result += oldTicks * newTicksPerSec / oldTicksPerSec;
@@ -86,34 +90,28 @@ class Util {
/**
* Converts struct THRIFT_TIMESPEC to arbitrary-sized ticks since epoch
*/
- static void toTicks(int64_t& result,
- const struct THRIFT_TIMESPEC& value,
- int64_t ticksPerSec) {
+ static void toTicks(int64_t& result, const struct THRIFT_TIMESPEC& value, int64_t ticksPerSec) {
return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec);
}
/**
* Converts struct timeval to arbitrary-sized ticks since epoch
*/
- static void toTicks(int64_t& result,
- const struct timeval& value,
- int64_t ticksPerSec) {
+ static void toTicks(int64_t& result, const struct timeval& value, int64_t ticksPerSec) {
return toTicks(result, value.tv_sec, value.tv_usec, US_PER_S, ticksPerSec);
}
/**
* Converts struct THRIFT_TIMESPEC to milliseconds
*/
- static void toMilliseconds(int64_t& result,
- const struct THRIFT_TIMESPEC& value) {
+ static void toMilliseconds(int64_t& result, const struct THRIFT_TIMESPEC& value) {
return toTicks(result, value, MS_PER_S);
}
/**
* Converts struct timeval to milliseconds
*/
- static void toMilliseconds(int64_t& result,
- const struct timeval& value) {
+ static void toMilliseconds(int64_t& result, const struct timeval& value) {
return toTicks(result, value, MS_PER_S);
}
@@ -146,7 +144,8 @@ class Util {
*/
static int64_t currentTimeUsec() { return currentTimeTicks(US_PER_S); }
};
-
-}}} // apache::thrift::concurrency
+}
+}
+} // apache::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_
http://git-wip-us.apache.org/repos/asf/thrift/blob/74260aa9/lib/cpp/src/thrift/cxxfunctional.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/cxxfunctional.h b/lib/cpp/src/thrift/cxxfunctional.h
index c24b91b..dadaac3 100644
--- a/lib/cpp/src/thrift/cxxfunctional.h
+++ b/lib/cpp/src/thrift/cxxfunctional.h
@@ -20,6 +20,8 @@
#ifndef _THRIFT_CXXFUNCTIONAL_H_
#define _THRIFT_CXXFUNCTIONAL_H_ 1
+// clang-format off
+
/**
* Loads <functional> from the 'right' location, depending
* on compiler and whether or not it's using C++03 with TR1